chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r1423258 - in /incubator/chukwa/trunk: ./ conf/ src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/test/java/org/apache/hadoop/chukwa/extra...
Date Tue, 18 Dec 2012 02:39:34 GMT
Author: asrabkin
Date: Tue Dec 18 02:39:30 2012
New Revision: 1423258

URL: http://svn.apache.org/viewvc?rev=1423258&view=rev
Log:
 CHUKWA-671. Json processors for processing JMX data from Hadoop, HBase and Zookeeper.  Contributed by shreyas subramanya.

Added:
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestJsonProcessors.java
Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/conf/chukwa-demux-conf.xml
    incubator/chukwa/trunk/conf/hbase.schema

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1423258&r1=1423257&r2=1423258&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Tue Dec 18 02:39:30 2012
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-671. Json processors for processing JMX data from Hadoop, HBase and Zookeeper.  (shreyas subramanya via asrabkin)
+
     CHUKWA-635. Collect swap usage. (Eric Yang)
 
     CHUKWA-664. Added network compression between agent and collector. (Sourygna Luangsay via Eric Yang)

Modified: incubator/chukwa/trunk/conf/chukwa-demux-conf.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-demux-conf.xml?rev=1423258&r1=1423257&r2=1423258&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-demux-conf.xml (original)
+++ incubator/chukwa/trunk/conf/chukwa-demux-conf.xml Tue Dec 18 02:39:30 2012
@@ -225,5 +225,35 @@
     <name>JobSummary</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.JobSummary</value>
    </property>
+
+   <property>
+    <name>DatanodeProcessor</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DatanodeProcessor</value>
+   </property>
+
+   <property>
+    <name>NamenodeProcessor</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.NamenodeProcessor</value>
+   </property>
+
+   <property>
+    <name>JobTrackerProcessor</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.JobTrackerProcessor</value>
+   </property>
+
+   <property>
+    <name>ZookeeperProcessor</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ZookeeperProcessor</value>
+   </property>
+
+   <property>
+    <name>HBaseMasterProcessor</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HBaseMasterProcessor</value>
+   </property>
+
+   <property>
+    <name>HBaseRegionServerProcessor</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HBaseRegionServerProcessor</value>
+   </property>
 </configuration>
  

Modified: incubator/chukwa/trunk/conf/hbase.schema
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/hbase.schema?rev=1423258&r1=1423257&r2=1423258&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/hbase.schema (original)
+++ incubator/chukwa/trunk/conf/hbase.schema Tue Dec 18 02:39:30 2012
@@ -38,3 +38,24 @@ create "chukwa", 
 {NAME=>"chukwaAgent_chunkQueue", VERSIONS => 65535},
 {NAME => "chukwaAgent_metrics", VERSION => 65535},
 {NAME => "chukwaAgent_httpSender", VERSION => 65535}
+create "HBase",
+{NAME => "master", VERSIONS => 65535},
+{NAME => "regionserver", VERSIONS => 65535}
+create "Namenode",
+{NAME => "summary", VERSIONS => 65535},
+{NAME => "hdfs", VERSIONS => 65535},
+{NAME => "rpc", VERSIONS => 65535},
+{NAME => "jvm", VERSIONS => 65535}
+create "Zookeeper",
+{NAME => "zk", VERSIONS => 65535}
+create "JobTracker",
+{NAME => "jt", VERSIONS => 65535},
+{NAME => "jvm", VERSIONS => 65535},
+{NAME => "rpc", VERSIONS => 65535}
+create "Datanode",
+{NAME => "dn", VERSIONS => 65535},
+{NAME => "jvm", VERSIONS => 65535},
+{NAME => "rpc", VERSIONS => 65535}
+
+
+

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+@Tables(annotations={
+@Table(name="Datanode",columnFamily="dn"),
+@Table(name="Datanode",columnFamily="jvm"),
+@Table(name="Datanode",columnFamily="rpc")
+})
+public class DatanodeProcessor extends AbstractProcessor{
+
+	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
+	
+	static {
+		long zero = 0L;
+		rateMap.put("blocks_verified", zero);
+		rateMap.put("blocks_written", zero);
+		rateMap.put("blocks_read", zero);
+		rateMap.put("bytes_written", zero);
+		rateMap.put("bytes_read", zero);
+		rateMap.put("heartBeats_num_ops", zero);		
+		rateMap.put("SentBytes", zero);
+		rateMap.put("ReceivedBytes", zero);
+		rateMap.put("rpcAuthorizationSuccesses", zero);
+		rateMap.put("rpcAuthorizationFailures", zero);
+		rateMap.put("RpcQueueTime_num_ops", zero);
+		rateMap.put("RpcProcessingTime_num_ops", zero);
+		rateMap.put("gcCount", zero);
+	}
+	
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		Logger log = Logger.getLogger(DatanodeProcessor.class); 
+		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();		
+		
+		final ChukwaRecord hdfs_datanode = new ChukwaRecord();
+		final ChukwaRecord datanode_jvm = new ChukwaRecord();
+		final ChukwaRecord datanode_rpc = new ChukwaRecord();
+		
+		Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
+			private static final long serialVersionUID = 1L;
+
+			{
+				put("blocks_verified", hdfs_datanode);
+				put("blocks_written", hdfs_datanode);
+				put("blocks_read", hdfs_datanode);
+				put("blocks_replicated", hdfs_datanode);
+				put("blocks_removed", hdfs_datanode);
+				put("bytes_written", hdfs_datanode);
+				put("bytes_read", hdfs_datanode);
+				put("heartBeats_avg_time", hdfs_datanode);
+				put("heartBeats_num_ops", hdfs_datanode);
+				
+				put("gcCount", datanode_jvm);
+				put("gcTimeMillis", datanode_jvm);
+				put("logError", datanode_jvm);
+				put("logFatal", datanode_jvm);
+				put("logInfo", datanode_jvm);
+				put("logWarn", datanode_jvm);
+				put("memHeapCommittedM", datanode_jvm);
+				put("memHeapUsedM", datanode_jvm);
+				put("threadsBlocked", datanode_jvm);
+				put("threadsNew", datanode_jvm);
+				put("threadsRunnable", datanode_jvm);
+				put("threadsTerminated", datanode_jvm);
+				put("threadsTimedWaiting", datanode_jvm);
+				put("threadsWaiting", datanode_jvm);
+
+				put("ReceivedBytes", datanode_rpc);				
+				put("RpcProcessingTime_avg_time", datanode_rpc);	
+				put("RpcProcessingTime_num_ops", datanode_rpc);	
+				put("RpcQueueTime_avg_time", datanode_rpc);	
+				put("RpcQueueTime_num_ops", datanode_rpc);	
+				put("SentBytes", datanode_rpc);	
+				put("rpcAuthorizationSuccesses", datanode_rpc);
+			}
+		};
+		try{
+			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
+			String ttTag = chunk.getTag("timeStamp");
+			if(ttTag == null){
+				log.warn("timeStamp tag not set in JMX adaptor for datanode");
+			}
+			else{
+				timeStamp = Long.parseLong(ttTag);
+			}
+			Iterator<JSONObject> iter = obj.entrySet().iterator();
+			
+			while(iter.hasNext()){
+				Map.Entry entry = (Map.Entry)iter.next();
+				String key = (String) entry.getKey();
+				Object value = entry.getValue();
+				String valueString = value == null?"":value.toString();	
+				
+				//Calculate rate for some of the metrics
+				if(rateMap.containsKey(key)){
+					long oldValue = rateMap.get(key);
+					long curValue = Long.parseLong(valueString);
+					rateMap.put(key, curValue);
+					long newValue = curValue - oldValue;
+					if(newValue < 0){
+						log.error("DatanodeProcessor's rateMap might be reset or corrupted for metric "+key);
+						newValue = 0L;
+					}					
+					valueString = Long.toString(newValue);
+				}
+				
+				if(metricsMap.containsKey(key)){
+					ChukwaRecord rec = metricsMap.get(key);
+					rec.add(key, valueString);
+				}				
+			}
+			buildGenericRecord(hdfs_datanode, null, timeStamp, "dn");
+			output.collect(key, hdfs_datanode);
+			buildGenericRecord(datanode_jvm, null, timeStamp, "jvm");
+			output.collect(key, datanode_jvm);
+			buildGenericRecord(datanode_rpc, null, timeStamp, "rpc");
+			output.collect(key, datanode_rpc);
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));
+		}		
+	}	
+}

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.record.Buffer;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+@Tables(annotations={
+@Table(name="HBase",columnFamily="master")
+})
+public class HBaseMasterProcessor extends AbstractProcessor{
+	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
+	static {
+		long zero = 0L;	
+		rateMap.put("splitSizeNumOps", zero);
+		rateMap.put("splitTimeNumOps", zero);
+	}
+	
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		
+		Logger log = Logger.getLogger(HBaseMasterProcessor.class); 
+		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+		ChukwaRecord record = new ChukwaRecord();
+		
+		Map<String, Buffer> metricsMap = new HashMap<String,Buffer>();
+
+		try{
+			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
+			String ttTag = chunk.getTag("timeStamp");
+			if(ttTag == null){
+				log.warn("timeStamp tag not set in JMX adaptor for hbase master");
+			}
+			else{
+				timeStamp = Long.parseLong(ttTag);
+			}
+			Iterator<JSONObject> iter = obj.entrySet().iterator();
+			
+			while(iter.hasNext()){
+				Map.Entry entry = (Map.Entry)iter.next();
+				String key = (String) entry.getKey();
+				Object value = entry.getValue();
+				String valueString = value == null?"":value.toString();	
+				
+				//Calculate rate for some of the metrics
+				if(rateMap.containsKey(key)){
+					long oldValue = rateMap.get(key);
+					long curValue = Long.parseLong(valueString);
+					rateMap.put(key, curValue);
+					long newValue = curValue - oldValue;
+					if(newValue < 0){
+						log.warn("HBaseMaster rateMap might be reset or corrupted for metric "+key);						
+						newValue = 0L;
+					}					
+					valueString = Long.toString(newValue);
+				}
+				
+				Buffer b = new Buffer(valueString.getBytes());
+				metricsMap.put(key,b);				
+			}			
+			
+			TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
+			record.setMapFields(t);			
+			buildGenericRecord(record, null, timeStamp, "master");
+			output.collect(key, record);
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));
+		}		
+	}	
+}

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.record.Buffer;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+@Tables(annotations={
+@Table(name="HBase",columnFamily="regionserver")
+})
+public class HBaseRegionServerProcessor extends AbstractProcessor{
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		
+		Logger log = Logger.getLogger(HBaseRegionServerProcessor.class); 
+		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+		ChukwaRecord record = new ChukwaRecord();
+		
+		Map<String, Buffer> metricsMap = new HashMap<String,Buffer>();
+
+		try{
+			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
+			String ttTag = chunk.getTag("timeStamp");
+			if(ttTag == null){
+				log.warn("timeStamp tag not set in JMX adaptor for hbase region server");
+			}
+			else{
+				timeStamp = Long.parseLong(ttTag);
+			}
+			Iterator<JSONObject> iter = obj.entrySet().iterator();
+			
+			while(iter.hasNext()){
+				Map.Entry entry = (Map.Entry)iter.next();
+				String key = (String) entry.getKey();
+				Object value = entry.getValue();
+				String valueString = value == null?"":value.toString();	
+				Buffer b = new Buffer(valueString.getBytes());
+				metricsMap.put(key,b);						
+			}			
+			
+			TreeMap<String, Buffer> t = new TreeMap<String, Buffer>(metricsMap);
+			record.setMapFields(t);
+			buildGenericRecord(record, null, timeStamp, "regionserver");			
+			output.collect(key, record);
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));
+		}		
+	}	
+}

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+
+@Tables(annotations={
+@Table(name="JobTracker",columnFamily="jt"),
+@Table(name="JobTracker",columnFamily="jvm"),
+@Table(name="JobTracker",columnFamily="rpc")
+})
+public class JobTrackerProcessor extends AbstractProcessor{
+	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
+	static {
+		long zero = 0L;	
+		rateMap.put("SentBytes", zero);
+		rateMap.put("ReceivedBytes", zero);
+		rateMap.put("rpcAuthorizationSuccesses", zero);
+		rateMap.put("rpcAuthorizationFailures", zero);
+		rateMap.put("RpcQueueTime_num_ops", zero);
+		rateMap.put("RpcProcessingTime_num_ops", zero);
+		rateMap.put("heartbeats", zero);
+		rateMap.put("jobs_submitted", zero);
+		rateMap.put("jobs_completed", zero);
+		rateMap.put("jobs_failed", zero);
+		rateMap.put("jobs_killed", zero);
+		rateMap.put("maps_launched", zero);
+		rateMap.put("maps_completed", zero);
+		rateMap.put("maps_failed", zero);
+		rateMap.put("maps_killed", zero);
+		rateMap.put("reduces_launched", zero);
+		rateMap.put("reduces_completed", zero);
+		rateMap.put("reduces_failed", zero);
+		rateMap.put("reduces_killed", zero);
+		rateMap.put("gcCount", zero);
+	}
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		Logger log = Logger.getLogger(JobTrackerProcessor.class); 
+		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+		
+		final ChukwaRecord mapred_jt = new ChukwaRecord();
+		final ChukwaRecord jt_jvm = new ChukwaRecord();
+		final ChukwaRecord jt_rpc = new ChukwaRecord();
+		
+		Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
+			private static final long serialVersionUID = 1L;
+			{
+				put("gcCount", jt_jvm);
+				put("gcTimeMillis", jt_jvm);
+				put("logError", jt_jvm);
+				put("logFatal", jt_jvm);
+				put("logInfo", jt_jvm);
+				put("logWarn", jt_jvm);
+				put("memHeapCommittedM", jt_jvm);
+				put("memHeapUsedM", jt_jvm);
+				put("threadsBlocked", jt_jvm);
+				put("threadsNew", jt_jvm);
+				put("threadsRunnable", jt_jvm);
+				put("threadsTerminated", jt_jvm);
+				put("threadsTimedWaiting", jt_jvm);
+				put("threadsWaiting", jt_jvm);
+
+				put("ReceivedBytes", jt_rpc);				
+				put("RpcProcessingTime_avg_time", jt_rpc);	
+				put("RpcProcessingTime_num_ops", jt_rpc);	
+				put("RpcQueueTime_avg_time", jt_rpc);	
+				put("RpcQueueTime_num_ops", jt_rpc);	
+				put("SentBytes", jt_rpc);	
+				put("rpcAuthorizationSuccesses", jt_rpc);	
+				put("rpcAuthorizationnFailures", jt_rpc);	
+			}
+		};		
+		try{
+			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
+			String ttTag = chunk.getTag("timeStamp");
+			if(ttTag == null){
+				log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
+			}
+			else{
+				timeStamp = Long.parseLong(ttTag);
+			}
+			Iterator<JSONObject> iter = obj.entrySet().iterator();
+			
+			while(iter.hasNext()){
+				Map.Entry entry = (Map.Entry)iter.next();
+				String key = (String) entry.getKey();
+				Object value = entry.getValue();
+				String valueString = value == null?"":value.toString();	
+				
+				//Calculate rate for some of the metrics
+				if(rateMap.containsKey(key)){
+					long oldValue = rateMap.get(key);
+					long curValue = Long.parseLong(valueString);
+					rateMap.put(key, curValue);
+					long newValue = curValue - oldValue;
+					if(newValue < 0){
+						log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "+key);						
+						newValue = 0L;
+					}					
+					valueString = Long.toString(newValue);
+				}
+				
+				//These metrics are string types with JSON structure. So we parse them and get the count
+				if(key.indexOf("Json") >= 0){	
+					//ignore these for now. Parsing of JSON array is throwing class cast exception. 
+				}	
+				else if(metricsMap.containsKey(key)){
+					ChukwaRecord rec = metricsMap.get(key);
+					rec.add(key, valueString);
+				}
+				else {
+					mapred_jt.add(key, valueString);
+				}
+			}			
+			
+			buildGenericRecord(mapred_jt, null, timeStamp, "jt");
+			output.collect(key, mapred_jt);
+			buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
+			output.collect(key, jt_jvm);
+			buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
+			output.collect(key, jt_rpc);
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));
+		}
+	}
+}
+

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+@Tables(annotations={
+@Table(name="Namenode",columnFamily="summary"),
+@Table(name="Namenode",columnFamily="hdfs"),
+@Table(name="Namenode",columnFamily="rpc"),
+@Table(name="Namenode",columnFamily="jvm")
+})
+public class NamenodeProcessor extends AbstractProcessor{
+	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
+	
+	static {
+		long zero = 0L;
+		rateMap.put("AddBlockOps", zero);
+		rateMap.put("CreateFileOps", zero);
+		rateMap.put("DeleteFileOps", zero);
+		rateMap.put("FileInfoOps", zero);
+		rateMap.put("FilesAppended", zero);
+		rateMap.put("FilesCreated", zero);
+		rateMap.put("FilesDeleted", zero);
+		rateMap.put("FileInGetListingOps", zero);
+		rateMap.put("FilesRenamed", zero);
+		rateMap.put("GetBlockLocations", zero);
+		rateMap.put("GetListingOps", zero);
+		rateMap.put("SentBytes", zero);
+		rateMap.put("ReceivedBytes", zero);
+		rateMap.put("rpcAuthorizationSuccesses", zero);
+		rateMap.put("rpcAuthorizationFailures", zero);
+		rateMap.put("RpcQueueTime_num_ops", zero);
+		rateMap.put("RpcProcessingTime_num_ops", zero);
+		rateMap.put("gcCount", zero);
+	}
+	
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		try{
+			Logger log = Logger.getLogger(NamenodeProcessor.class); 
+			long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+		
+			final ChukwaRecord hdfs_overview = new ChukwaRecord();
+			final ChukwaRecord hdfs_namenode = new ChukwaRecord();
+			final ChukwaRecord namenode_jvm = new ChukwaRecord();
+			final ChukwaRecord namenode_rpc = new ChukwaRecord();
+		
+			Map<String, ChukwaRecord> metricsMap = new HashMap<String,ChukwaRecord>(){
+				private static final long serialVersionUID = 1L;	
+				{					
+					put("BlockCapacity", hdfs_overview);
+					put("BlocksTotal", hdfs_overview);
+					put("CapacityTotalGB", hdfs_overview);
+					put("CapacityUsedGB", hdfs_overview);
+					put("CapacityRemainingGB", hdfs_overview);
+					put("CorruptBlocks", hdfs_overview);
+					put("ExcessBlocks", hdfs_overview);
+					put("FilesTotal", hdfs_overview);
+					put("MissingBlocks", hdfs_overview);
+					put("PendingDeletionBlocks", hdfs_overview);
+					put("PendingReplicationBlocks", hdfs_overview);
+					put("ScheduledReplicationBlocks", hdfs_overview);
+					put("TotalLoad", hdfs_overview);
+					put("UnderReplicatedBlocks", hdfs_overview);
+					
+					put("gcCount", namenode_jvm);
+					put("gcTimeMillis", namenode_jvm);
+					put("logError", namenode_jvm);
+					put("logFatal", namenode_jvm);
+					put("logInfo", namenode_jvm);
+					put("logWarn", namenode_jvm);
+					put("memHeapCommittedM", namenode_jvm);
+					put("memHeapUsedM", namenode_jvm);
+					put("threadsBlocked", namenode_jvm);
+					put("threadsNew", namenode_jvm);
+					put("threadsRunnable", namenode_jvm);
+					put("threadsTerminated", namenode_jvm);
+					put("threadsTimedWaiting", namenode_jvm);
+					put("threadsWaiting", namenode_jvm);
+	
+					put("ReceivedBytes", namenode_rpc);				
+					put("RpcProcessingTime_avg_time", namenode_rpc);	
+					put("RpcProcessingTime_num_ops", namenode_rpc);	
+					put("RpcQueueTime_avg_time", namenode_rpc);	
+					put("RpcQueueTime_num_ops", namenode_rpc);	
+					put("SentBytes", namenode_rpc);	
+					put("rpcAuthorizationSuccesses", namenode_rpc);	
+					put("rpcAuthenticationFailures", namenode_rpc);	
+					put("rpcAuthenticationSuccesses", namenode_rpc);	
+				}
+			};	
+		
+		
+			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
+			String ttTag = chunk.getTag("timeStamp");
+			if(ttTag == null){
+				log.warn("timeStamp tag not set in JMX adaptor for namenode");
+			}
+			else{
+				timeStamp = Long.parseLong(ttTag);
+			}
+			Iterator<JSONObject> iter = obj.entrySet().iterator();
+			
+			
+			while(iter.hasNext()){
+				Map.Entry entry = (Map.Entry)iter.next();
+				String key = (String) entry.getKey();
+				Object value = entry.getValue();	
+				String valueString = (value == null)?"":value.toString();
+				
+				//These metrics are string types with JSON structure. So we parse them and get the count
+				if(key.equals("LiveNodes") || key.equals("DeadNodes") || key.equals("DecomNodes") || key.equals("NameDirStatuses")){					
+					JSONObject jobj = (JSONObject) JSONValue.parse(valueString);
+					valueString = Integer.toString(jobj.size());
+				}	
+				
+				//Calculate rate for some of the metrics
+				if(rateMap.containsKey(key)){
+					long oldValue = rateMap.get(key);
+					long curValue = Long.parseLong(valueString);
+					rateMap.put(key, curValue);
+					long newValue = curValue - oldValue;
+					if(newValue < 0){
+						log.error("NamenodeProcessor's rateMap might be reset or corrupted for metric "+key);
+						newValue = 0L;
+					}					
+					valueString = Long.toString(newValue);
+				}
+				
+				//Check if metric belongs to one of the categories in metricsMap. If not just write it in group Hadoop.HDFS.NameNode
+				if(metricsMap.containsKey(key)){
+					ChukwaRecord rec = metricsMap.get(key);
+					rec.add(key, valueString);					
+				}	
+				else{
+					hdfs_namenode.add(key, valueString);
+				}
+			}			
+			buildGenericRecord(hdfs_overview, null, timeStamp, "summary");			
+			output.collect(key, hdfs_overview);
+			buildGenericRecord(hdfs_namenode, null, timeStamp, "hdfs");
+			output.collect(key, hdfs_namenode);
+			buildGenericRecord(namenode_jvm, null, timeStamp, "jvm");
+			output.collect(key, namenode_jvm);
+			buildGenericRecord(namenode_rpc, null, timeStamp, "rpc");
+			output.collect(key, namenode_rpc);
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));			
+		}
+		
+	}
+	
+}
+

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+@Tables(annotations={
+@Table(name="Zookeeper",columnFamily="zk")
+})
+public class ZookeeperProcessor extends AbstractProcessor{
+
+	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
+	static {
+		long zero = 0L;	
+		rateMap.put("PacketsSent", zero);
+		rateMap.put("PacketsReceived", zero);
+	}
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		Logger log = Logger.getLogger(ZookeeperProcessor.class); 
+		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+		final ChukwaRecord record = new ChukwaRecord();
+		
+		Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
+			private static final long serialVersionUID = 1L;
+
+			{
+				put("MinRequestLatency", record);
+				put("AvgRequestLatency", record);
+				put("MaxRequestLatency", record);
+				put("PacketsReceived", record);
+				put("PacketsSent", record);			
+				put("OutstandingRequests", record);
+				put("NodeCount", record);
+				put("WatchCount", record);
+			}
+		};
+		try{
+			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
+			String ttTag = chunk.getTag("timeStamp");
+			if(ttTag == null){
+				log.warn("timeStamp tag not set in JMX adaptor for zookeeper");
+			}
+			else{
+				timeStamp = Long.parseLong(ttTag);
+			}
+			Iterator<JSONObject> iter = obj.entrySet().iterator();
+			
+			while(iter.hasNext()){
+				Map.Entry entry = (Map.Entry)iter.next();
+				String key = (String) entry.getKey();
+				Object value = entry.getValue();
+				String valueString = value == null?"":value.toString();	
+				
+				if(metricsMap.containsKey(key)){
+					ChukwaRecord rec = metricsMap.get(key);
+					rec.add(key, valueString);
+				}				
+			}			
+			
+			buildGenericRecord(record, null, timeStamp, "zk");
+			output.collect(key, record);
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));
+		}
+	}
+}

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailer.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,55 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import static org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestFileTailer {
+	private ChukwaAgent agent;
+	private String adaptorId;
+	private File testFile;
+
+	@After
+	public void tearDown() throws Exception {
+		agent.stopAdaptor(adaptorId, false);
+		agent.shutdown();
+		if (testFile != null) {
+			testFile.delete();
+		}
+	}
+
+	@Test
+	public void testDontSleepIfHasMoreData() throws AlreadyRunningException, IOException, InterruptedException {
+		ChukwaConfiguration cc = new ChukwaConfiguration();
+		cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 18); // small in order to have hasMoreData=true
+																	 // (with 26 letters we should have 2 chunks)
+		agent = new ChukwaAgent(cc);
+		
+		ChunkCatcherConnector chunks = new ChunkCatcherConnector();
+	    chunks.start();
+
+	    File baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+		testFile = makeTestFile("testDontSleepIfHasMoreData", 1, baseDir); // insert 26 letters on file
+		long startTime = System.currentTimeMillis();
+		adaptorId = agent.processAddCommand("add adaptor_test ="
+				+ "filetailer.FileTailingAdaptor testDontSleepIfHasMoreData "
+				+ testFile.getCanonicalPath() + " 0");
+
+		chunks.waitForAChunk();
+		chunks.waitForAChunk();
+		
+		long endTime = System.currentTimeMillis();
+		assertTrue( endTime - startTime < 300 ); // ensure that everything finishes very fast
+												 // faster than SAMPLE_PERIOD_MS (ie: we don't sleep)
+	}
+
+}

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestJsonProcessors.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestJsonProcessors.java?rev=1423258&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestJsonProcessors.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestJsonProcessors.java Tue Dec 18 02:39:30 2012
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import junit.framework.TestCase;
+
+public class TestJsonProcessors extends TestCase {
+
+	/**
+	 * Process the chunk with the passed Processor and compare with the input
+	 * JSONObject
+	 * 
+	 * @param p
+	 * @param inData
+	 * @param chunk
+	 * @return
+	 */
+	private String testProcessor(AbstractProcessor p, JSONObject inData,
+			Chunk chunk) {
+		ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output = new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+		p.process(new ChukwaArchiveKey(), chunk, output, null);
+		HashMap<ChukwaRecordKey, ChukwaRecord> outData = output.data;
+
+		// First get all ChukwaRecords and then get all field-data pairs within
+		// each record
+		Iterator<Entry<ChukwaRecordKey, ChukwaRecord>> recordIter = outData
+				.entrySet().iterator();
+		while (recordIter.hasNext()) {
+			Entry<ChukwaRecordKey, ChukwaRecord> recordEntry = recordIter
+					.next();
+			ChukwaRecord value = recordEntry.getValue();
+			String[] fields = value.getFields();
+			for (String field : fields) {
+				//ignore ctags
+				if(field.equals("ctags")) {
+					continue;
+				}
+				String data = value.getValue(field);
+				String expected = String.valueOf(inData.get(field));
+				/*System.out.println("Metric, expected data, received data- " +
+				 field + ", " + expected + ", " +data);
+				*/
+				if (!expected.equals(data)) {
+					StringBuilder sb = new StringBuilder(
+							"Failed to verify metric - ");
+					sb.append("field:").append(field);
+					sb.append(", expected:").append(expected);
+					sb.append(", but received:").append(data);
+					return sb.toString();
+				}
+			}
+		}
+		return null;
+	}
+
+	@SuppressWarnings("unchecked")
+	private JSONObject getJSONObject(){
+		String csource = "localhost";
+		try {
+			csource = InetAddress.getLocalHost().getHostName();
+		} catch (UnknownHostException e) {
+			csource = "localhost";
+		}
+		JSONObject json = new JSONObject();
+		json.put("capp", "Test");
+		json.put("csource", csource);
+		return json;
+	}
+
+	@SuppressWarnings("unchecked")
+	public void testJobTrackerProcessor() {
+		// test metric for each record type
+		JSONObject json = getJSONObject();
+		json.put("memHeapUsedM", "286");
+		json.put("maps_killed", "3");
+		json.put("waiting_maps", "1");
+		json.put("RpcProcessingTime_avg_time", "0.003");
+		byte[] data = json.toString().getBytes();
+		JobTrackerProcessor p = new JobTrackerProcessor();
+		ChunkImpl ch = new ChunkImpl("TestType", "Test", data.length, data,
+				null);
+		String failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+
+		// test gauge metric
+		json.put("maps_killed", "5");
+		data = json.toString().getBytes();
+		ch = new ChunkImpl("TestType", "Test", data.length, data, null);
+		json.put("maps_killed", "2");
+		failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+	}
+
+	@SuppressWarnings("unchecked")
+	public void testNamenodeProcessor() {
+		// test metric for each record type
+		JSONObject json = getJSONObject();
+		json.put("BlocksTotal", "1234");
+		json.put("FilesCreated", "33");
+		json.put("RpcQueueTime_avg_time", "0.001");
+		json.put("gcCount", "112");
+		json.put("Transactions_num_ops", "3816");
+		byte[] data = json.toString().getBytes();
+		NamenodeProcessor p = new NamenodeProcessor();
+		ChunkImpl ch = new ChunkImpl("TestType", "Test", data.length, data,
+				null);
+		String failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+
+		// test gauge metric
+		json.put("FilesCreated", "55");
+		json.put("gcCount", "115");
+		data = json.toString().getBytes();
+		ch = new ChunkImpl("TestType", "Test", data.length, data, null);
+		json.put("FilesCreated", "22");
+		json.put("gcCount", "3");
+		failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+	}
+
+	@SuppressWarnings("unchecked")
+	public void testDatanodeProcessor() {
+		// test metric for each record type
+		JSONObject json = getJSONObject();
+		json.put("heartBeats_num_ops", "10875");
+		json.put("FilesCreated", "33");
+		json.put("RpcQueueTime_avg_time", "0.001");
+		json.put("gcCount", "112");
+		json.put("Capacity", "22926269645");
+		byte[] data = json.toString().getBytes();
+		DatanodeProcessor p = new DatanodeProcessor();
+		ChunkImpl ch = new ChunkImpl("TestType", "Test", data.length, data,
+				null);
+		String failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+
+		// test gauge metric
+		json.put("heartBeats_num_ops", "10980");
+		json.put("gcCount", "115");
+		data = json.toString().getBytes();
+		ch = new ChunkImpl("TestType", "Test", data.length, data, null);
+		json.put("heartBeats_num_ops", "105");
+		json.put("gcCount", "3");
+		failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+	}
+
+	@SuppressWarnings("unchecked")
+	public void testHBaseMasterProcessor() {
+		// test metric for each record type
+		JSONObject json = getJSONObject();
+		json.put("splitSizeNumOps", "108");
+		json.put("AverageLoad", "3.33");
+		byte[] data = json.toString().getBytes();
+		HBaseMasterProcessor p = new HBaseMasterProcessor();
+		ChunkImpl ch = new ChunkImpl("TestType", "Test", data.length, data,
+				null);
+		String failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+
+		// test gauge metric
+		json.put("splitSizeNumOps", "109");
+		data = json.toString().getBytes();
+		ch = new ChunkImpl("TestType", "Test", data.length, data, null);
+		json.put("splitSizeNumOps", "1");
+		failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+	}
+
+	@SuppressWarnings("unchecked")
+	public void testHBaseRegionServerProcessor() {
+		// test metric for each record type
+		JSONObject json = getJSONObject();
+		json.put("blockCacheSize", "2681872");
+		byte[] data = json.toString().getBytes();
+		HBaseMasterProcessor p = new HBaseMasterProcessor();
+		ChunkImpl ch = new ChunkImpl("TestType", "Test", data.length, data,
+				null);
+		String failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+		// no gauge metrics yet
+	}
+
+	@SuppressWarnings("unchecked")
+	public void testZookeeperProcessor() {
+		// test metric for each record type
+		JSONObject json = getJSONObject();
+		json.put("packetsSent", "2049");
+		json.put("NodeCount", "40");
+		byte[] data = json.toString().getBytes();
+		ZookeeperProcessor p = new ZookeeperProcessor();
+		ChunkImpl ch = new ChunkImpl("TestType", "Test", data.length, data,
+				null);
+		String failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+
+		// test gauge metric
+		json.put("packetsSent", "2122");
+		data = json.toString().getBytes();
+		ch = new ChunkImpl("TestType", "Test", data.length, data, null);
+		json.put("packetsSent", "73");
+		failMsg = testProcessor(p, json, ch);
+		assertNull(failMsg, failMsg);
+	}
+}
+



Mime
View raw message