chukwa-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [1/4] chukwa git commit: CHUKWA-744. Implemented new parsers for extract and transform data to HBase format. (Eric Yang)
Date Mon, 27 Apr 2015 00:37:43 GMT
Repository: chukwa
Updated Branches:
  refs/heads/master 6def7b64d -> a6e0cbad7


CHUKWA-744.  Implemented new parsers for extract and transform data to HBase format.  (Eric
Yang)


Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/f9dea324
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/f9dea324
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/f9dea324

Branch: refs/heads/master
Commit: f9dea324bd532478e81ca4a4fece26ec42119b6f
Parents: 6def7b6
Author: Eric Yang <eyang@apache.org>
Authored: Sat Apr 18 11:54:56 2015 -0700
Committer: Eric Yang <eyang@apache.org>
Committed: Sat Apr 18 11:54:56 2015 -0700

----------------------------------------------------------------------
 .../extraction/hbase/AbstractProcessor.java     | 135 +++++++++++++
 .../hbase/ChukwaMetricsProcessor.java           |  59 ++++++
 .../extraction/hbase/DefaultProcessor.java      |  50 +++++
 .../hbase/HadoopMetricsProcessor.java           |  86 ++++++++
 .../chukwa/extraction/hbase/LogEntry.java       |  64 ++++++
 .../extraction/hbase/ProcessorFactory.java      |  55 +++++
 .../chukwa/extraction/hbase/SystemMetrics.java  | 200 +++++++++++++++++++
 .../hbase/UnknownRecordTypeException.java       |  44 ++++
 .../apache/hadoop/chukwa/util/HBaseUtil.java    |  62 ++++++
 9 files changed, 755 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
new file mode 100644
index 0000000..b39c789
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
+import org.apache.hadoop.chukwa.util.HBaseUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+
+public abstract class AbstractProcessor {
+  static Logger LOG = Logger.getLogger(AbstractProcessor.class);
+
+  protected int entryCount = 0;
+  protected String primaryKeyHelper;
+  protected String sourceHelper;
+
+  protected byte[] key = null;
+  byte[] CF = "t".getBytes();
+
+  boolean chunkInErrorSaved = false;
+  ArrayList<Put> output = null;
+  ArrayList<Put> meta = null;
+  Reporter reporter = null;
+  long time = System.currentTimeMillis();
+  Chunk chunk = null;
+  MessageDigest md5 = null;
+
+  public AbstractProcessor() throws NoSuchAlgorithmException {
+    md5 = MessageDigest.getInstance("md5");
+  }
+
+  protected abstract void parse(byte[] recordEntry) throws Throwable;
+
+  /**
+   * Generic metric function to add a metric to HBase with full primary key and
+   * source computed.
+   * 
+   * @param time
+   * @param metric
+   * @param source
+   * @param value
+   * @param output
+   */
+  public void addRecord(long time, String metric, String source, byte[] value,
+      ArrayList<Put> output) {
+    String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
+        .append(metric).toString();
+    byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
+    Put put = new Put(key);
+    byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+    put.add(CF, timeInBytes, time, value);
+    output.add(put);
+    reporter.putMetric(chunk.getDataType(), primaryKey);
+    reporter.putSource(chunk.getDataType(), source);
+  }
+
+  public void addRecord(String primaryKey, String value) {
+    addRecord(primaryKey, value.getBytes());
+  }
+
+  /**
+   * Generic function to add a metric to HBase metric table, this function
+   * assumes "time" and "source" have been defined and will construct primaryKey
+   * only, without recompute time and source md5.
+   * 
+   * @param time
+   * @param primaryKey
+   * @param value
+   * @param output
+   */
+  public void addRecord(String metric, byte[] value) {
+    String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
+        .append(metric).toString();
+    byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
+    Put put = new Put(key);
+    byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+    put.add(CF, timeInBytes, time, value);
+    output.add(put);
+    reporter.putMetric(chunk.getDataType(), primaryKey);
+  }
+
+  /**
+   * Process a chunk to store in HBase.
+   * 
+   * @param chunk
+   * @param output
+   * @param reporter
+   * @throws Throwable
+   */
+  public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
+      throws Throwable {
+    this.output = output;
+    this.reporter = reporter;
+    this.chunk = chunk;
+    this.primaryKeyHelper = chunk.getDataType();
+    this.sourceHelper = chunk.getSource();
+    reporter.putSource(primaryKeyHelper, sourceHelper);
+    parse(chunk.getData());
+    addMeta();
+  }
+
+  protected void addMeta() {
+    byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), sourceHelper);
+    Put put = new Put(key);
+    String family = "a";
+    byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+    put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes());
+    output.add(put);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java
b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java
new file mode 100644
index 0000000..156d9d5
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+
+public class ChukwaMetricsProcessor extends HadoopMetricsProcessor {  
+  static Logger LOG = Logger.getLogger(ChukwaMetricsProcessor.class);
+  
+  public ChukwaMetricsProcessor() throws NoSuchAlgorithmException {
+	super();
+  }
+
+  /**
+   * Process cluster name and store in HBase.
+   * 
+   * @param chunk
+   * @param output
+   * @param reporter
+   * @throws Throwable
+   */
+  @Override
+  public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
+      throws Throwable {
+    this.output = output;
+    this.reporter = reporter;
+    this.chunk = chunk;
+    this.primaryKeyHelper = chunk.getDataType();
+    this.sourceHelper = chunk.getSource();
+    String clusterName = chunk.getTag("cluster");
+    reporter.putSource(primaryKeyHelper, sourceHelper);
+    reporter.putClusterName(primaryKeyHelper, clusterName);
+    parse(chunk.getData());
+    addMeta();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
new file mode 100644
index 0000000..2da64a3
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.hadoop.chukwa.util.HBaseUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+public class DefaultProcessor extends AbstractProcessor {
+	
+  public DefaultProcessor() throws NoSuchAlgorithmException {
+	super();
+	// TODO Auto-generated constructor stub
+  }
+
+static Logger LOG = Logger.getLogger(DefaultProcessor.class);
+
+  @Override
+  protected void parse(byte[] recordEntry) throws Throwable {
+	  byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource());
+	  Put put = new Put(key);
+	  byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+	  put.add("t".getBytes(), timeInBytes, chunk.getData());
+	  output.add(put);
+	  JSONObject json = new JSONObject();
+	  json.put("sig", key);
+	  json.put("type", "unknown");
+	  reporter.put(chunk.getDataType(), chunk.getSource(), json.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
new file mode 100644
index 0000000..3afd71a
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.util.HBaseUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class HadoopMetricsProcessor extends AbstractProcessor {
+  
+  static Logger LOG = Logger.getLogger(HadoopMetricsProcessor.class);
+  static final String timestampField = "timestamp";
+  static final String contextNameField = "contextName";
+  static final String recordNameField = "recordName";
+  static final byte[] cf = "t".getBytes();
+
+  public HadoopMetricsProcessor() throws NoSuchAlgorithmException {
+  }
+
+  @Override
+  protected void parse(byte[] recordEntry) throws Throwable {
+    try {
+    	String body = new String(recordEntry);
+        int start = body.indexOf('{');
+        JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
+
+        time = ((Long) json.get(timestampField)).longValue();
+        String contextName = (String) json.get(contextNameField);
+        String recordName = (String) json.get(recordNameField);
+        byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
+
+        @SuppressWarnings("unchecked")
+		Iterator<String> ki = json.keySet().iterator();
+        while (ki.hasNext()) {
+          String keyName = ki.next();
+          if (timestampField.intern() == keyName.intern()) {
+        	  continue;
+          } else if (contextNameField.intern() == keyName.intern()) {
+        	  continue;
+          } else if (recordNameField.intern() == keyName.intern()) {
+        	  continue;
+          } else {
+            if(json.get(keyName)!=null) {
+                byte[] v = json.get(keyName).toString().getBytes();
+                String primaryKey = new StringBuilder(contextName).append(".").
+              		  append(recordName).append(".").
+              		  append(keyName).toString();
+                byte[] rowKey = HBaseUtil.buildKey(time, primaryKey, chunk.getSource());
+                Put r = new Put(rowKey);
+                r.add(cf, timeInBytes, time, v);
+                output.add(r);
+            }
+          }
+        }
+        
+      } catch (Exception e) {
+        LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
+            e);
+        throw e;
+      }	
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
new file mode 100644
index 0000000..dcbe2d4
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LogEntry {
+	private final static SimpleDateFormat sdf = new SimpleDateFormat(
+			"yyyy-MM-dd HH:mm");
+
+	private Date date;
+	private String logLevel;
+	private String className;
+	private String body;
+
+	public LogEntry(String recordEntry) throws ParseException {
+		String dStr = recordEntry.substring(0, 23);
+		date = sdf.parse(dStr);
+		int start = 24;
+		int idx = recordEntry.indexOf(' ', start);
+		logLevel = recordEntry.substring(start, idx);
+		start = idx + 1;
+		idx = recordEntry.indexOf(' ', start);
+		className = recordEntry.substring(start, idx - 1);
+		body = recordEntry.substring(idx + 1);
+	}
+
+	public Date getDate() {
+		return date;
+	}
+
+	public void setDate(Date date) {
+		this.date = date;
+	}
+
+	public String getLogLevel() {
+		return logLevel;
+	}
+
+	public String getClassName() {
+		return className;
+	}
+
+	public String getBody() {
+		return body;
+	}
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java
b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java
new file mode 100644
index 0000000..96931d7
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+
+import java.util.HashMap;
+import org.apache.log4j.Logger;
+
+public class ProcessorFactory {
+  static Logger log = Logger.getLogger(ProcessorFactory.class);
+
+  private static HashMap<String, AbstractProcessor> processors = new HashMap<String,
AbstractProcessor>(); // registry
+
+  public ProcessorFactory() {
+  }
+
+  public static AbstractProcessor getProcessor(String parserClass)
+      throws UnknownRecordTypeException {
+    if (processors.containsKey(parserClass)) {
+      return processors.get(parserClass);
+    } else {
+      AbstractProcessor processor = null;
+      try {
+        processor = (AbstractProcessor) Class.forName(parserClass).getConstructor()
+            .newInstance();
+      } catch (ClassNotFoundException e) {
+        throw new UnknownRecordTypeException("Unknown parserClass:"
+            + parserClass, e);
+      } catch (Exception e) {
+        throw new UnknownRecordTypeException("error constructing processor", e);
+      }
+
+      // TODO using a ThreadSafe/reuse flag to actually decide if we want
+      // to reuse the same processor again and again
+      processors.put(parserClass, processor);
+      return processor;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
new file mode 100644
index 0000000..a72e1bd
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Demux parser for system metrics data collected through
+ * org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics.
+ */
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class SystemMetrics extends AbstractProcessor {
+
+  public SystemMetrics() throws NoSuchAlgorithmException {
+    super();
+  }
+
+  @Override
+  protected void parse(byte[] recordEntry) throws Throwable {
+    String buffer = new String(recordEntry);
+    JSONObject json = (JSONObject) JSONValue.parse(buffer);
+    time = ((Long) json.get("timestamp")).longValue();
+    ChukwaRecord record = new ChukwaRecord();
+    JSONArray cpuList = (JSONArray) json.get("cpu");
+    double combined = 0.0;
+    double user = 0.0;
+    double sys = 0.0;
+    double idle = 0.0;
+    int actualSize = 0;
+    for (int i = 0; i < cpuList.size(); i++) {
+      JSONObject cpu = (JSONObject) cpuList.get(i);
+      // Work around for sigar returning null sometimes for cpu metrics on
+      // pLinux
+      if (cpu.get("combined") == null) {
+        continue;
+      }
+      actualSize++;
+      combined = combined + Double.parseDouble(cpu.get("combined").toString());
+      user = user + Double.parseDouble(cpu.get("user").toString());
+      sys = sys + Double.parseDouble(cpu.get("sys").toString());
+      idle = idle + Double.parseDouble(cpu.get("idle").toString());
+      for (@SuppressWarnings("unchecked")
+      Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator();
iterator
+          .hasNext();) {
+        String key = iterator.next();
+        addRecord("cpu." + key + "." + i, cpu.get(key).toString());
+      }
+    }
+    combined = combined / actualSize;
+    user = user / actualSize;
+    sys = sys / actualSize;
+    idle = idle / actualSize;
+    addRecord("cpu.combined", Double.toString(combined));
+    addRecord("cpu.user", Double.toString(user));
+    addRecord("cpu.idle", Double.toString(idle));
+    addRecord("cpu.sys", Double.toString(sys));
+
+    addRecord("Uptime", json.get("uptime").toString());
+    JSONArray loadavg = (JSONArray) json.get("loadavg");
+    addRecord("LoadAverage.1", loadavg.get(0).toString());
+    addRecord("LoadAverage.5", loadavg.get(1).toString());
+    addRecord("LoadAverage.15", loadavg.get(2).toString());
+
+    record = new ChukwaRecord();
+    JSONObject memory = (JSONObject) json.get("memory");
+    @SuppressWarnings("unchecked")
+    Iterator<String> memKeys = memory.keySet().iterator();
+    while (memKeys.hasNext()) {
+      String key = memKeys.next();
+      addRecord("memory." + key, memory.get(key).toString());
+    }
+
+    record = new ChukwaRecord();
+    JSONObject swap = (JSONObject) json.get("swap");
+    @SuppressWarnings("unchecked")
+    Iterator<String> swapKeys = swap.keySet().iterator();
+    while (swapKeys.hasNext()) {
+      String key = swapKeys.next();
+      addRecord("swap." + key, swap.get(key).toString());
+    }
+
+    double rxBytes = 0;
+    double rxDropped = 0;
+    double rxErrors = 0;
+    double rxPackets = 0;
+    double txBytes = 0;
+    double txCollisions = 0;
+    double txErrors = 0;
+    double txPackets = 0;
+    record = new ChukwaRecord();
+    JSONArray netList = (JSONArray) json.get("network");
+    for (int i = 0; i < netList.size(); i++) {
+      JSONObject netIf = (JSONObject) netList.get(i);
+      @SuppressWarnings("unchecked")
+      Iterator<String> keys = netIf.keySet().iterator();
+      while (keys.hasNext()) {
+        String key = keys.next();
+        record.add(key + "." + i, netIf.get(key).toString());
+        if (i != 0) {
+          if (key.equals("RxBytes")) {
+            rxBytes = rxBytes + (Long) netIf.get(key);
+          } else if (key.equals("RxDropped")) {
+            rxDropped = rxDropped + (Long) netIf.get(key);
+          } else if (key.equals("RxErrors")) {
+            rxErrors = rxErrors + (Long) netIf.get(key);
+          } else if (key.equals("RxPackets")) {
+            rxPackets = rxPackets + (Long) netIf.get(key);
+          } else if (key.equals("TxBytes")) {
+            txBytes = txBytes + (Long) netIf.get(key);
+          } else if (key.equals("TxCollisions")) {
+            txCollisions = txCollisions + (Long) netIf.get(key);
+          } else if (key.equals("TxErrors")) {
+            txErrors = txErrors + (Long) netIf.get(key);
+          } else if (key.equals("TxPackets")) {
+            txPackets = txPackets + (Long) netIf.get(key);
+          }
+        }
+      }
+    }
+
+    addRecord("network.RxBytes", Double.toString(rxBytes));
+    addRecord("network.RxDropped", Double.toString(rxDropped));
+    addRecord("network.RxErrors", Double.toString(rxErrors));
+    addRecord("network.RxPackets", Double.toString(rxPackets));
+    addRecord("network.TxBytes", Double.toString(txBytes));
+    addRecord("network.TxCollisions", Double.toString(txCollisions));
+    addRecord("network.TxErrors", Double.toString(txErrors));
+    addRecord("network.TxPackets", Double.toString(txPackets));
+
+    double readBytes = 0;
+    double reads = 0;
+    double writeBytes = 0;
+    double writes = 0;
+    double total = 0;
+    double used = 0;
+    record = new ChukwaRecord();
+    JSONArray diskList = (JSONArray) json.get("disk");
+    for (int i = 0; i < diskList.size(); i++) {
+      JSONObject disk = (JSONObject) diskList.get(i);
+      Iterator<String> keys = disk.keySet().iterator();
+      while (keys.hasNext()) {
+        String key = keys.next();
+        record.add(key + "." + i, disk.get(key).toString());
+        if (key.equals("ReadBytes")) {
+          readBytes = readBytes + (Long) disk.get("ReadBytes");
+        } else if (key.equals("Reads")) {
+          reads = reads + (Long) disk.get("Reads");
+        } else if (key.equals("WriteBytes")) {
+          writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+        } else if (key.equals("Writes")) {
+          writes = writes + (Long) disk.get("Writes");
+        } else if (key.equals("Total")) {
+          total = total + (Long) disk.get("Total");
+        } else if (key.equals("Used")) {
+          used = used + (Long) disk.get("Used");
+        }
+      }
+    }
+    double percentUsed = used / total;
+    addRecord("disk.ReadBytes", Double.toString(readBytes));
+    addRecord("disk.Reads", Double.toString(reads));
+    addRecord("disk.WriteBytes", Double.toString(writeBytes));
+    addRecord("disk.Writes", Double.toString(writes));
+    addRecord("disk.Total", Double.toString(total));
+    addRecord("disk.Used", Double.toString(used));
+    addRecord("disk.PercentUsed", Double.toString(percentUsed));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java
b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java
new file mode 100644
index 0000000..866eb2c
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.hbase;
+
+
+public class UnknownRecordTypeException extends Exception {
+
+  /**
+	 * 
+	 */
+  private static final long serialVersionUID = 8925135975093252279L;
+
+  public UnknownRecordTypeException() {
+  }
+
+  public UnknownRecordTypeException(String message) {
+    super(message);
+  }
+
+  public UnknownRecordTypeException(Throwable cause) {
+    super(cause);
+  }
+
+  public UnknownRecordTypeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
new file mode 100644
index 0000000..d463dd1
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java
@@ -0,0 +1,62 @@
+package org.apache.hadoop.chukwa.util;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+
+public class HBaseUtil {
+  private static Logger LOG = Logger.getLogger(HBaseUtil.class);
+  
+  static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+  static MessageDigest md5 = null;
+  static {
+    try {
+      md5 = MessageDigest.getInstance("md5");
+    } catch (NoSuchAlgorithmException e) {
+      LOG.warn(ExceptionUtil.getStackTrace(e));
+    }
+  }
+
+  public HBaseUtil() throws NoSuchAlgorithmException {
+  }
+
+  public byte[] buildKey(long time, String metricGroup, String metric,
+      String source) {
+    String fullKey = new StringBuilder(metricGroup).append(".")
+        .append(metric).toString();
+    return buildKey(time, fullKey, source);
+  }
+
+  public static byte[] buildKey(long time, String primaryKey) {
+    c.setTimeInMillis(time);
+    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+    byte[] pk = getHash(primaryKey);
+    byte[] key = new byte[8];
+    System.arraycopy(day, 0, key, 0, day.length);
+    System.arraycopy(pk, 0, key, 2, 3);
+    return key;
+  }
+  
+  public static byte[] buildKey(long time, String primaryKey, String source) {
+    c.setTimeInMillis(time);
+    byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes();
+    byte[] pk = getHash(primaryKey);
+    byte[] src = getHash(source);
+    byte[] key = new byte[8];
+    System.arraycopy(day, 0, key, 0, day.length);
+    System.arraycopy(pk, 0, key, 2, 3);
+    System.arraycopy(src, 0, key, 5, 3);
+    return key;
+  }
+  
+  private static byte[] getHash(String key) {
+    byte[] hash = new byte[3];
+    System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 3);
+    return hash;
+  }
+}


Mime
View raw message