chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r987831 - in /incubator/chukwa/trunk: ./ conf/ ivy/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/...
Date Sat, 21 Aug 2010 21:17:06 GMT
Author: eyang
Date: Sat Aug 21 21:17:04 2010
New Revision: 987831

URL: http://svn.apache.org/viewvc?rev=987831&view=rev
Log:
CHUKWA-444. Added HBaseWriter for storing time series data in HBase for faster random read/write. (Eric Yang)

Added:
    incubator/chukwa/trunk/conf/hbase.schema
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Annotation.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClassUtils.java
    incubator/chukwa/trunk/src/test/hbase-site.xml
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/demux/
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/demux/TextParser.java
Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/build.xml
    incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template
    incubator/chukwa/trunk/conf/chukwa-env.sh.template
    incubator/chukwa/trunk/default.properties
    incubator/chukwa/trunk/ivy.xml
    incubator/chukwa/trunk/ivy/ivysettings.xml
    incubator/chukwa/trunk/ivy/libraries.properties
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Sat Aug 21 21:17:04 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-444. Added HBaseWriter for storing time series data in HBase for faster random read/write. (Eric Yang)
+
     CHUKWA-473.  Make default processor configurable. (Bill Graham via asrabkin)
 
     CHUKWA-479.  Support HTTP trigger actions (Bill Graham via asrabkin)

Modified: incubator/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/build.xml?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/build.xml (original)
+++ incubator/chukwa/trunk/build.xml Sat Aug 21 21:17:04 2010
@@ -148,7 +148,7 @@
 		<property name="jdiff.stable.javadoc" value="http://hadoop.apache.org/core/docs/r${jdiff.stable}/api/" />
 
 		<!--this is the naming policy for artifacts we want pulled down-->
-		<property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[conf]/[artifact]-[revision].[ext]" />
+		<property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[conf]/[artifact]-[revision](-[classifier]).[ext]" />
 		<property name="jdiff.home" value="${build.ivy.lib.dir}/${ant.project.name}/jdiff" />
 		<property name="jdiff.jar" value="${jdiff.home}/jdiff-${jdiff.version}.jar" />
 		<property name="xerces.jar" value="${jdiff.home}/xerces-${xerces.version}.jar" />
@@ -509,6 +509,7 @@
 
 			</filterchain>
                 </copy>
+                <copy file="${test.src.dir}/hbase-site.xml" tofile="${test.build.dir}/classes/hbase-site.xml"></copy>
                 <copy file="${basedir}/conf/log4j.properties" tofile="${test.build.dir}/conf/log4j.properties"></copy>
                 <copy file="${build.dir}/hicc.war" tofile="${test.build.classes}/webapps/hicc.war"></copy>
                 <copy file="${basedir}/conf/auth.conf" tofile="${test.build.dir}/conf/auth.conf"></copy>

Modified: incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template (original)
+++ incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template Sat Aug 21 21:17:04 2010
@@ -45,7 +45,31 @@
   </property>
 -->
 
+<!-- HBaseWriter parameters
+  <property>
+    <name>chukwaCollector.pipeline</name>
+    <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter</value>
+  </property>
+
+  <property>
+    <name>hbase.demux.package</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor</value>
+    <description>Demux parser class package, HBaseWriter uses this package name to validate HBase for annotated demux parser classes.</description>
+  </property>
 
+  <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>host1.mydomain.com,host2.mydomain.com,host3.mydomain.com</value>
+    <description>Comma separated list of servers in the ZooKeeper Quorum.
+    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+    By default this is set to localhost for local and pseudo-distributed modes
+    of operation. For a fully-distributed setup, this should be set to a full
+    list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
+    this is the list of servers which we will start/stop ZooKeeper on.
+    </description>
+  </property>
+
+-->
 
   <property>
     <name>writer.hdfs.filesystem</name>

Modified: incubator/chukwa/trunk/conf/chukwa-env.sh.template
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-env.sh.template?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-env.sh.template (original)
+++ incubator/chukwa/trunk/conf/chukwa-env.sh.template Sat Aug 21 21:17:04 2010
@@ -53,6 +53,19 @@ export HADOOP_CONF_DIR="@TODO-HADOOP-CON
 # errors due to protocol mismatch between hadoop versions.
 #export HADOOP_JAR=${HADOOP_HOME}/build/hadoop-*-core.jar
 
+# Optional (i.e. will try sensible defaults)
+# The location of HBase jars.  For writing data to HBase, you need to set
+# environment variable HBASE_HOME to HBase home directory.
+#export HBASE_HOME="@TODO-HBASE-HOME@"
+#export HBASE_JAR=`ls ${HBASE_HOME}/hbase*.jar`
+#export ZOO_KEEPER_JAR=`ls ${HBASE_HOME}/lib/zookeeper-*.jar`
+
+# Optional
+# The location of HBase Configuration directory.  For writing data to
+# HBase, you need to set environment variable HBASE_CONF to HBase conf
+# directory.
+export HBASE_CONF_DIR="@TODO-HBASE-CONF-DIR@"
+
 # The location of chukwa data repository (in either HDFS or your local
 # file system, whichever you are using)
 export chukwaRecordsRepository="/chukwa/repos/"
@@ -92,3 +105,4 @@ export JDBC_URL_PREFIX=@TODO_CHUKWA_JDBC
 export CHUKWA_HICC_MIN_MEM=
 export CHUKWA_HICC_MAX_MEM=
 
+#export CLASSPATH=${CLASSPATH}:${HBASE_JAR}:${HBASE_CONF_DIR}:${ZOO_KEEPER_JAR}

Added: incubator/chukwa/trunk/conf/hbase.schema
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/hbase.schema?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/conf/hbase.schema (added)
+++ incubator/chukwa/trunk/conf/hbase.schema Sat Aug 21 21:17:04 2010
@@ -0,0 +1,64 @@
+create "Hadoop",{NAME => "ClientTrace", VERSIONS => 65535},
+{NAME => "Hadoop_jvm_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_namenode", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_FSDirectory", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_FSNamesystem", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_datanode", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_jobtracker", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_shuffleInput", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_shuffleOutput", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_tasktracker", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_job", VERSIONS => 65535},
+{NAME => "Hadoop_rpc_metrics", VERSIONS => 65535}
+create "HadoopLog", {NAME => "NameNode", VERSIONS => 65535},
+{NAME => "DataNode", VERSIONS => 65535},
+{NAME => "Audit", VERSIONS => 65535},
+{NAME => "HadoopLog", VERSIONS => 65535},
+{NAME => "HadoopLogInError", VERSIONS => 65535}
+create "SystemMetrics", {NAME => "Disk", VERSIONS => 65535},
+{NAME => "SystemMetrics", VERSIONS => 65535},
+{NAME => "Ps", VERSIONS => 65535},
+{NAME => "SysLog", VERSIONS => 65535},
+{NAME => "Top", VERSIONS => 65535},
+{NAME => "Df", VERSIONS => 65535},
+{NAME => "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Top", VERSIONS => 65535},
+{NAME => "TopInError", VERSIONS => 65535},
+{NAME => "DfInError", VERSIONS => 65535}
+create "Mapreduce",{NAME => "JobData", VERSIONS => 65535},
+{NAME => "JobConfData", VERSIONS => 65535},
+{NAME => "JobLogHistory", VERSIONS => 65535},
+{NAME => "TaskData", VERSIONS => 65535},
+{NAME => "TaskLogInError", VERSIONS => 65535}
+create "TsProcessor",{NAME => "log", VERSIONS => 65535}
+create "chukwa",{NAME => "Hadoop_rpc_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_jvm_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_metrics", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_namenode", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_FSDirectory", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_FSNamesystem", VERSIONS => 65535},
+{NAME => "Hadoop_dfs_datanode", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_jobtracker", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_shuffleInput", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_shuffleOutput", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_tasktracker", VERSIONS => 65535},
+{NAME => "Hadoop_mapred_job", VERSIONS => 65535},
+{NAME => "Df", VERSIONS => 65535},
+{NAME => "DfInError", VERSIONS => 65535},
+{NAME => "SystemMetrics", VERSION => 65535},
+{NAME => "Top", VERSION => 65535},
+{NAME => "TopInError", VERSION => 65535},
+{NAME => "DataNode", VERSION => 65535},
+{NAME => "ClientTrace", VERSION => 65535},
+{NAME => "Audit", VERSIONS => 65535},
+{NAME => "chunkQueue", VERSIONS => 65535},
+{NAME => "NameNode", VERSIONS => 65535},
+{NAME => "HadoopLog", VERSIONS => 65535},
+{NAME => "HadoopLogInError", VERSIONS => 65535},
+{NAME => "TaskLog", VERSIONS => 65535},
+{NAME => "TaskLogInError", VERSIONS => 65535},
+{NAME => "mapredInError", VERSION => 65535},
+{NAME => "chukwaAgent", VERSIONS => 65535},
+{NAME => "chukwaHttpSender", VERSIONS => 65535}

Modified: incubator/chukwa/trunk/default.properties
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/default.properties?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/default.properties (original)
+++ incubator/chukwa/trunk/default.properties Sat Aug 21 21:17:04 2010
@@ -38,4 +38,6 @@ TODO_CHUKWA_JDBC_URL_PREFIX=
 TODO-NAGIOS-SERVER=localhost
 TODO-DEMUX-IO-SORT-MB=64
 TODO-DEMUX-FS-INMEMORY-SIZE_MB=64
-TODO-DEMUX-IO-SORT-FACTOR=10
\ No newline at end of file
+TODO-DEMUX-IO-SORT-FACTOR=10
+TODO-HBASE-HOME=/home/user/Development/hbase-trunk
+TODO-HBASE-CONF-DIR=/home/user/Development/hbase-trunk/conf

Modified: incubator/chukwa/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/ivy.xml?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/ivy.xml (original)
+++ incubator/chukwa/trunk/ivy.xml Sat Aug 21 21:17:04 2010
@@ -16,7 +16,7 @@
    limitations under the License.
 -->
 
-<ivy-module version="1.0">
+<ivy-module version="1.0" xmlns:m="http://ant.apache.org/ivy/maven">
   <info organisation="org.apache.hadoop.chukwa" module="${ant.project.name}">
     <license name="Apache 2.0"/>
     <ivyauthor name="Apache Hadoop Chukwa Team" url="http://hadoop.apache.org/chukwa"/>
@@ -64,11 +64,18 @@
     <dependency org="org.apache.hbase"
       name="hbase"
       rev="${hbase.version}"
+      conf="common->master">
+      <artifact name="hbase" type="jar"/>
+      <artifact name="hbase" type="tests" ext="jar" m:classifier="tests"/>
+    </dependency>
+    <dependency org="com.google.guava"
+      name="guava"
+      rev="${guava.version}"
       conf="common->master"/>
-    <dependency org="org.apache.hadoop.zookeeper"
+    <!-- <dependency org="org.apache.hadoop.zookeeper"
       name="zookeeper"
       rev="${zookeeper.version}"
-      conf="common->master"/> 
+      conf="common->master"/> -->
     <dependency org="com.sun.jersey"
       name="jersey-core"
       rev="${jersey.version}"/>

Modified: incubator/chukwa/trunk/ivy/ivysettings.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/ivy/ivysettings.xml?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/ivy/ivysettings.xml (original)
+++ incubator/chukwa/trunk/ivy/ivysettings.xml Sat Aug 21 21:17:04 2010
@@ -42,6 +42,9 @@
   <property name="ibiblio.org"
     value="http://mirrors.ibiblio.org/pub/mirrors/maven2/"
     override="false"/>
+  <property name="google.com"
+    value="http://google-maven-repository.googlecode.com/svn/repository/"
+    override="false"/>
   <property name="maven2.pattern"
     value="[organisation]/[module]/[revision]/[module]-[revision]"/>
   <property name="maven2.pattern.ext"
@@ -75,12 +78,17 @@
       root="${ibiblio.org}"
       m2compatible="true"
       />
+    <ibiblio name="google"
+      root="${google.com}"
+      m2compatible="true"
+      />
     <chain name="default" dual="true">
       <resolver ref="local"/>
       <resolver ref="apache"/>
       <resolver ref="rawson"/>
       <resolver ref="facebook"/>
       <resolver ref="ibiblio"/>
+      <resolver ref="google"/>
     </chain>
     <chain name="internal">
       <resolver ref="local"/>
@@ -89,6 +97,7 @@
     <chain name="external">
       <resolver ref="maven2"/>
       <resolver ref="facebook"/>
+      <resolver ref="google"/>
     </chain>
     <chain name="external-and-snapshots">
       <resolver ref="maven2"/>

Modified: incubator/chukwa/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/ivy/libraries.properties?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/ivy/libraries.properties (original)
+++ incubator/chukwa/trunk/ivy/libraries.properties Sat Aug 21 21:17:04 2010
@@ -53,4 +53,5 @@ hbase.version=0.89.0-SNAPSHOT
 thrift.version=0.2.0
 zookeeper.version=3.2.2
 jersey.version=1.1.5.1
+guava.version=r05
 #avro.version=1.3.2

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java Sat Aug 21 21:17:04 2010
@@ -23,7 +23,7 @@ import org.apache.hadoop.chukwa.Chunk;
 
 
 public abstract class PipelineableWriter implements ChukwaWriter {
-  ChukwaWriter next;
+  protected ChukwaWriter next;
   public void setNextStage(ChukwaWriter next) {
     this.next = next;
   }

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Annotation.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Annotation.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Annotation.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Annotation.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.hbase;
+
+import java.lang.annotation.*;
+
+public class Annotation {
+
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.TYPE})
+  public @interface Tables {
+    Table[] annotations();
+  }
+
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.TYPE})
+  public @interface Table {
+    String name();
+    String columnFamily();
+  }
+
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD,ElementType.FIELD})
+  public @interface ColumnFamily {
+  }
+
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.LOCAL_VARIABLE})
+  public @interface RowKey {
+  }
+
+}

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.hbase;
+
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
+
+import org.apache.hadoop.chukwa.util.ClassUtils;
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.OutputCollector;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.log4j.Logger;
+
+public class HBaseWriter extends PipelineableWriter {
+  static Logger log = Logger.getLogger(HBaseWriter.class);
+  boolean reportStats;
+  volatile long dataSize = 0;
+  final Timer statTimer;
+  private OutputCollector output;
+  private Reporter reporter;
+  private ChukwaConfiguration conf = new ChukwaConfiguration();
+  String defaultProcessor = conf.get(
+      "chukwa.demux.mapper.default.processor",
+      "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+  private HTablePool pool;
+  private Configuration hconf;
+  
+  private class StatReportingTask extends TimerTask {
+    private long lastTs = System.currentTimeMillis();
+    private long lastDataSize = 0;
+
+    public void run() {
+      long time = System.currentTimeMillis();
+      long interval = time - lastTs;
+      lastTs = time;
+
+      long ds = dataSize;
+      long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
+      // refers only to data field, not including http or chukwa headers
+      lastDataSize = ds;
+
+      log.info("stat=HBaseWriter|dataRate="
+          + dataRate);
+    }
+  };
+
+  public HBaseWriter() {
+    this(true);
+  }
+
+  public HBaseWriter(boolean reportStats) {
+    this.reportStats = reportStats;
+    statTimer = new Timer();
+    hconf = HBaseConfiguration.create();
+  }
+
+  public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
+    this(true);
+    this.conf = conf;
+    this.hconf = hconf;
+  }
+
+  public void close() {
+    statTimer.cancel();
+  }
+
+  public void init(Configuration conf) throws WriterException {
+    statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+    output = new OutputCollector();
+    reporter = new Reporter();
+    verifyHbaseSchema();
+    pool = new HTablePool(hconf, 60);
+  }
+
+  private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
+    boolean status = false;
+    try {
+      if(admin.tableExists(table.name())) {
+        HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
+        HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
+        for(HColumnDescriptor cd : columnDescriptors) {
+          if(cd.getNameAsString().equals(table.columnFamily())) {
+            log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
+            status = true;
+          }
+        }
+      } else {
+        throw new Exception("HBase table: "+table.name()+ " does not exist.");
+      }
+    } catch(Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+      status = false;
+    }
+    return status;    
+  }
+  
+  private void verifyHbaseSchema() {
+    log.debug("Verify Demux parser with HBase schema");
+    boolean schemaVerified = false;
+    try {
+      HBaseAdmin admin = new HBaseAdmin(hconf);
+      List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
+      for(Class<?> x : demuxParsers) {
+        if(x.isAnnotationPresent(Tables.class)) {
+          Tables list = x.getAnnotation(Tables.class);
+          for(Table table : list.annotations()) {
+            if(!verifyHbaseTable(admin, table)) {
+              throw new Exception("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");              
+            }
+          }
+        } else if(x.isAnnotationPresent(Table.class)) {
+          Table table = x.getAnnotation(Table.class);
+          if(!verifyHbaseTable(admin, table)) {
+            throw new Exception("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
+          }
+        }
+      }
+      schemaVerified = true;
+    } catch (Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+    }
+    if(!schemaVerified) {
+      log.error("Hbase schema mismatch with demux parser.");
+      if(conf.getBoolean("halt.on.schema.mismatch", true)) {
+        log.error("Exiting...");
+        DaemonWatcher.bailout(-1);
+      }
+    }
+  }
+
+  @Override
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    CommitStatus rv = ChukwaWriter.COMMIT_OK;
+    try {
+      for(Chunk chunk : chunks) {
+        String processorClass = conf.get(chunk.getDataType(),
+                defaultProcessor);
+        synchronized (this) {
+          MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
+          try {
+            Table table = null;
+            if(processor.getClass().isAnnotationPresent(Table.class)) {
+              table = processor.getClass().getAnnotation(Table.class);
+            } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
+              Tables tables = processor.getClass().getAnnotation(Tables.class);
+              for(Table t : tables.annotations()) {
+                table = t;
+              }
+            }
+            if(table!=null) {
+              HTableInterface hbase = pool.getTable(table.name());  
+              processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
+              hbase.put(output.getKeyValues());
+              pool.putTable(hbase);
+            }
+          } catch (Exception e) {
+            log.warn(ExceptionUtil.getStackTrace(e));
+          }
+          dataSize += chunk.getData().length;
+          output.clear();
+          reporter.clear();
+        }
+      }
+    } catch (Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+      throw new WriterException("Failed to store data to HBase.");
+    }    
+    if (next != null) {
+      rv = next.add(chunks); //pass data through
+    }
+    return rv;
+  }
+
+}

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.log4j.Logger;
+
+public class OutputCollector implements
+    org.apache.hadoop.mapred.OutputCollector<ChukwaRecordKey, ChukwaRecord> {
+  
+  private List<Put> buffers;
+
+  public OutputCollector() {
+    buffers = new ArrayList<Put>();
+  }
+  
+  @Override
+  public void collect(ChukwaRecordKey key, ChukwaRecord value) throws IOException {
+    StringBuffer s = new StringBuffer();
+    String[] keyParts = key.getKey().split("/");
+    s.append(keyParts[0]);
+    s.append("-");
+    s.append(keyParts[1]);
+
+    for(String field : value.getFields()) {
+        Put kv = new Put(s.toString().getBytes());
+        kv.add(key.getReduceType().getBytes(), field.getBytes(), value.getTime(), value.getValue(field).getBytes());
+        buffers.add(kv);
+    }    
+  }
+
+  public List<Put> getKeyValues() {
+    return buffers;
+  }
+
+  public void clear() {
+    buffers.clear();
+  }
+  
+}

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.hbase;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Counters.Counter;
+
+public class Reporter implements org.apache.hadoop.mapred.Reporter {
+
+  @Override
+  public Counter getCounter(Enum<?> arg0) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Counter getCounter(String arg0, String arg1) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public InputSplit getInputSplit() throws UnsupportedOperationException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void incrCounter(Enum<?> arg0, long arg1) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void incrCounter(String arg0, String arg1, long arg2) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void setStatus(String arg0) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void progress() {
+    // TODO Auto-generated method stub
+    
+  }
+
+  public void clear() {
+    // TODO Auto-generated method stub
+    
+  }
+
+}

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java Sat Aug 21 21:17:04 2010
@@ -125,5 +125,5 @@ public abstract class AbstractProcessor 
     startOffset = recordOffsets[currentPos] + 1;
     currentPos++;
     return RecordConstants.recoverRecordSeparators("\n", log);
-  }
+  }  
 }

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java Sat Aug 21 21:17:04 2010
@@ -23,19 +23,18 @@ import java.net.InetAddress;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.Logger;
 
+@Table(name="Hadoop",columnFamily="ClientTrace")
 public class ClientTraceProcessor extends AbstractProcessor {
-
   private static final String recordType = "ClientTrace";
   private final SimpleDateFormat sdf =
     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java Sat Aug 21 21:17:04 2010
@@ -23,14 +23,18 @@ import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Table(name="SystemMetrics",columnFamily="Disk")
 public class Df extends AbstractProcessor {
   static Logger log = Logger.getLogger(Df.class);
+  
   private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
       "Used", "Available", "Use%", "Mounted", "on" };
   private static final String[] headerCols = { "Filesystem", "1K-blocks",
@@ -59,7 +63,7 @@ public class Df extends AbstractProcesso
       Date d = sdf.parse(dStr);
       String[] lines = body.split("\n");
 
-      String[] outputCols = lines[0].split("[\\s]++");
+      String[] outputCols = lines[0].substring(lines[0].indexOf("Filesystem")).split("[\\s]++");
 
       if (outputCols.length != headerSplitCols.length
           || outputCols[0].intern() != headerSplitCols[0].intern()

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java Sat Aug 21 21:17:04 2010
@@ -23,12 +23,20 @@ import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Tables(annotations={
+@Table(name="HadoopLog",columnFamily="NameNode"),
+@Table(name="HadoopLog",columnFamily="DataNode"),
+@Table(name="HadoopLog",columnFamily="Audit")
+})
 public class HadoopLogProcessor extends AbstractProcessor {
   static Logger log = Logger.getLogger(HadoopLogProcessor.class);
 

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java Sat Aug 21 21:17:04 2010
@@ -25,6 +25,9 @@ import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -33,8 +36,36 @@ import org.apache.log4j.Logger;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+@Tables(annotations={
+@Table(name="Hadoop",columnFamily="Hadoop_jvm_metrics"),
+@Table(name="Hadoop",columnFamily="Hadoop_mapred_metrics"),
+@Table(name="Hadoop",columnFamily="Hadoop_dfs_metrics"),
+@Table(name="Hadoop",columnFamily="Hadoop_dfs_namenode"),
+@Table(name="Hadoop",columnFamily="Hadoop_dfs_FSDirectory"),
+@Table(name="Hadoop",columnFamily="Hadoop_dfs_FSNamesystem"),
+@Table(name="Hadoop",columnFamily="Hadoop_dfs_datanode"),
+@Table(name="Hadoop",columnFamily="Hadoop_mapred_jobtracker"),
+@Table(name="Hadoop",columnFamily="Hadoop_mapred_shuffleInput"),
+@Table(name="Hadoop",columnFamily="Hadoop_mapred_shuffleOutput"),
+@Table(name="Hadoop",columnFamily="Hadoop_mapred_tasktracker"),
+@Table(name="Hadoop",columnFamily="Hadoop_mapred_job"),
+@Table(name="Hadoop",columnFamily="Hadoop_rpc_metrics")
+})
+@Deprecated
 public class HadoopMetricsProcessor extends AbstractProcessor {
-
+  public static final String jvm = "Hadoop_jvm_metrics";
+  public static final String mapred = "Hadoop_mapred_metrics";
+  public static final String dfs = "Hadoop_dfs_metrics";
+  public static final String namenode = "Hadoop_dfs_namenode";
+  public static final String fsdir = "Hadoop_dfs_FSDirectory";
+  public static final String fsname = "Hadoop_dfs_FSNamesystem";
+  public static final String datanode = "Hadoop_dfs_datanode";
+  public static final String jobtracker = "Hadoop_mapred_jobtracker";
+  public static final String shuffleIn = "Hadoop_mapred_shuffleInput";
+  public static final String shuffleOut = "Hadoop_mapred_shuffleOutput";
+  public static final String tasktracker = "Hadoop_mapred_tasktracker";
+  public static final String mr = "Hadoop_mapred_job";
+  
   static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
   static final String chukwaTimestampField = "chukwa_timestamp";
   static final String contextNameField = "contextName";

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java Sat Aug 21 21:17:04 2010
@@ -23,16 +23,19 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Table(name="SystemMetrics",columnFamily="SystemMetrics")
 public class Iostat extends AbstractProcessor {
   static Logger log = Logger.getLogger(Iostat.class);
   public final String recordType = this.getClass().getName();
-
+  
   private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
   private static Pattern p = null;
 

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java Sat Aug 21 21:17:04 2010
@@ -27,21 +27,29 @@ import java.util.regex.Pattern;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 import org.json.JSONObject;
-import org.mortbay.util.ajax.JSON;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
 
+@Tables(annotations={
+@Table(name="Mapreduce",columnFamily="JobData"),
+@Table(name="Mapreduce",columnFamily="JobConfData")
+})
 public class JobConfProcessor extends AbstractProcessor {
     static Logger log = Logger.getLogger(JobConfProcessor.class);
+    private static final String jobData = "JobData";
+    private static final String jobConfData = "JobConfData";
+    
     static  Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
     static  Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
     @Override
@@ -122,7 +130,7 @@ public class JobConfProcessor extends Ab
         record.add("JOBCONF-JSON", json.toString());
         record.add("mapred.job.queue.name", queue);
         record.add("JOBID", "job_" + jobID);
-        buildGenericRecord(record, null, time, "JobData");
+        buildGenericRecord(record, null, time, jobData);
         calendar.setTimeInMillis(time);
         calendar.set(Calendar.MINUTE, 0);
         calendar.set(Calendar.SECOND, 0);
@@ -131,7 +139,7 @@ public class JobConfProcessor extends Ab
         output.collect(key, record);
 
         jobConfRecord.add("JOBID", "job_" + jobID);
-        buildGenericRecord(jobConfRecord, null, time, "JobConfData");
+        buildGenericRecord(jobConfRecord, null, time, jobConfData);
         output.collect(key, jobConfRecord);
             
         tmp.delete();
@@ -142,6 +150,6 @@ public class JobConfProcessor extends Ab
   }
   
   public String getDataType() {
-    return Torque.class.getName();
+    return JobConfProcessor.class.getName();
   }
 }

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java Sat Aug 21 21:17:04 2010
@@ -19,18 +19,24 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 import java.util.ArrayList;
-import java.util.Date;
+
 import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.JobHistory;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
+@Tables(annotations={
+@Table(name="Mapreduce",columnFamily="JobData"),
+@Table(name="Mapreduce",columnFamily="TaskData")
+})
 public class JobLog extends AbstractProcessor {
   private String savedLines = "";
   

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java Sat Aug 21 21:17:04 2010
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -31,6 +33,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Table(name="Mapreduce",columnFamily="JobLogHistory")
 public class JobLogHistoryProcessor extends AbstractProcessor {
   static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
 

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java Sat Aug 21 21:17:04 2010
@@ -20,19 +20,21 @@ package org.apache.hadoop.chukwa.extract
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Table(name="SystemMetrics",columnFamily="Ps")
 public class Ps extends AbstractProcessor {
   static Logger log = Logger.getLogger(Ps.class);
+  public static final String reduceType = "Ps";
 
   @Override
   protected void parse(String recordEntry,
@@ -43,7 +45,7 @@ public class Ps extends AbstractProcesso
     for (HashMap<String, String> processInfo : ps.getProcessList()) {
       key = new ChukwaRecordKey();
       ChukwaRecord record = new ChukwaRecord();
-      this.buildGenericRecord(record, null, log.getDate().getTime(), "Ps");
+      this.buildGenericRecord(record, null, log.getDate().getTime(), reduceType);
       for (Entry<String, String> entry : processInfo.entrySet()) {
         record.add(entry.getKey(), entry.getValue());
       }

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java Sat Aug 21 21:17:04 2010
@@ -23,14 +23,18 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Table(name="SystemMetrics",columnFamily="SystemMetrics")
 public class Sar extends AbstractProcessor {
   static Logger log = Logger.getLogger(Sar.class);
+  public static final String reduceType = "SystemMetrics";
   public final String recordType = this.getClass().getName();
 
   private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
@@ -94,31 +98,31 @@ public class Sar extends AbstractProcess
 
             record = new ChukwaRecord();
             key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+            this.buildGenericRecord(record, null, d.getTime(), reduceType);
           } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
             log.debug("Matched Sar-Network");
 
             record = new ChukwaRecord();
             key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+            this.buildGenericRecord(record, null, d.getTime(), reduceType);
           } else if (headers[1].equals("kbmemfree")) {
             log.debug("Matched Sar-Memory");
 
             record = new ChukwaRecord();
             key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+            this.buildGenericRecord(record, null, d.getTime(), reduceType);
           } else if (headers[1].equals("totsck")) {
             log.debug("Matched Sar-NetworkSockets");
 
             record = new ChukwaRecord();
             key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+            this.buildGenericRecord(record, null, d.getTime(), reduceType);
           } else if (headers[1].equals("runq-sz")) {
             log.debug("Matched Sar-LoadAverage");
 
             record = new ChukwaRecord();
             key = new ChukwaRecordKey();
-            this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+            this.buildGenericRecord(record, null, d.getTime(), reduceType);
           } else {
             log.debug("No match:" + headers[1] + " " + headers[2]);
           }

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java Sat Aug 21 21:17:04 2010
@@ -24,15 +24,19 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Table(name="SystemMetrics",columnFamily="SysLog")
 public class SysLog extends AbstractProcessor {
 
   static Logger log = Logger.getLogger(SysLog.class);
+  private static final String reduceType = "SysLog";  
   private SimpleDateFormat sdf = null;
 
   public SysLog() {
@@ -63,7 +67,7 @@ public class SysLog extends AbstractProc
 
       ChukwaRecord record = new ChukwaRecord();
       buildGenericRecord(record, recordEntry, convertDate.getTime().getTime(),
-          "SysLog");
+          reduceType);
       output.collect(key, record);
     } catch (ParseException e) {
       e.printStackTrace();

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java Sat Aug 21 21:17:04 2010
@@ -25,14 +25,22 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+@Tables(annotations={
+@Table(name="SystemMetrics",columnFamily="SystemMetrics"),
+@Table(name="SystemMetrics",columnFamily="Top")
+})
 public class Top extends AbstractProcessor {
   static Logger log = Logger.getLogger(Top.class);
+  public final String reduceType = "SystemMetrics";
   public final String recordType = this.getClass().getName();
 
   private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
@@ -77,7 +85,7 @@ public class Top extends AbstractProcess
         record = new ChukwaRecord();
         key = new ChukwaRecordKey();
         parseSummary(record, summaryString);
-        this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+        this.buildGenericRecord(record, null, d.getTime(), reduceType);
         output.collect(key, record);
 
         StringBuffer buffer = new StringBuffer();
@@ -90,7 +98,7 @@ public class Top extends AbstractProcess
         }
         record = new ChukwaRecord();
         key = new ChukwaRecordKey();
-        this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
+        this.buildGenericRecord(record, buffer.toString(), d.getTime(), recordType);
         // Output Top info to database
         output.collect(key, record);
 

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=987831&r1=987830&r2=987831&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Sat Aug 21 21:17:04 2010
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.demux.Demux;
@@ -59,6 +60,7 @@ import org.apache.log4j.Logger;
  * </UL>
  *
  */
+@Table(name="TsProcessor",columnFamily="log")
 public class TsProcessor extends AbstractProcessor {
   static Logger log = Logger.getLogger(TsProcessor.class);
 

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClassUtils.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClassUtils.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClassUtils.java (added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ClassUtils.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.AbstractProcessor;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+
+public class ClassUtils {
+  static Logger log = Logger.getLogger(ClassUtils.class);
+  
+  /**
+   * Attempts to list all the classes in the specified package as determined
+   * by the context class loader
+   * 
+   * @param pckgname
+   *            the package name to search
+   * @return a list of classes that exist within that package
+   * @throws ClassNotFoundException
+   *             if something went wrong
+   */
+  public static List<Class> getClassesForPackage(String pckgname)
+      throws ClassNotFoundException {
+    // This will hold a list of directories matching the pckgname.
+    // There may be more than one if a package is split over multiple jars/paths
+    List<Class> classes = new ArrayList<Class>();
+    ArrayList<File> directories = new ArrayList<File>();
+    try {
+      ClassLoader cld = Thread.currentThread().getContextClassLoader();
+      if (cld == null) {
+        throw new ClassNotFoundException("Can't get class loader.");
+      }
+      // Ask for all resources for the path
+      Enumeration<URL> resources = cld.getResources(pckgname.replace('.', '/'));
+      while (resources.hasMoreElements()) {
+        URL res = resources.nextElement();
+        if (res.getProtocol().equalsIgnoreCase("jar")) {
+          JarURLConnection conn = (JarURLConnection) res.openConnection();
+          JarFile jar = conn.getJarFile();
+          for (JarEntry e : Collections.list(jar.entries())) {
+
+            if (e.getName().startsWith(pckgname.replace('.', '/'))
+                && e.getName().endsWith(".class") && !e.getName().contains("$")) {
+              String className = e.getName().replace("/", ".").substring(0,
+                  e.getName().length() - 6);
+              classes.add(Class.forName(className));
+            }
+          }
+        } else
+          directories.add(new File(URLDecoder.decode(res.getPath(), "UTF-8")));
+      }
+    } catch (NullPointerException x) {
+      throw new ClassNotFoundException(pckgname + " does not appear to be "
+          + "a valid package (Null pointer exception)");
+    } catch (UnsupportedEncodingException encex) {
+      throw new ClassNotFoundException(pckgname + " does not appear to be "
+          + "a valid package (Unsupported encoding)");
+    } catch (IOException ioex) {
+      throw new ClassNotFoundException("IOException was thrown when trying "
+          + "to get all resources for " + pckgname);
+    }
+
+    // For every directory identified capture all the .class files
+    for (File directory : directories) {
+      if (directory.exists()) {
+        // Get the list of the files contained in the package
+        String[] files = directory.list();
+        for (String file : files) {
+          // we are only interested in .class files
+          if (file.endsWith(".class")) {
+            // removes the .class extension
+            classes.add(Class.forName(pckgname + '.'
+                + file.substring(0, file.length() - 6)));
+          }
+        }
+      } else {
+        throw new ClassNotFoundException(pckgname + " (" + directory.getPath()
+            + ") does not appear to be a valid package");
+      }
+    }
+    return classes;
+  }
+
+  public static List<Class> getClassessOfInterface(String thePackage,
+      Class theInterface) {
+    List<Class> classList = new ArrayList<Class>();
+    try {
+      for (Class discovered : getClassesForPackage(thePackage)) {
+        if (Arrays.asList(discovered.getInterfaces()).contains(theInterface)) {
+          classList.add(discovered);
+        }
+      }
+    } catch (ClassNotFoundException ex) {
+      log.error(ExceptionUtil.getStackTrace(ex));
+    }
+
+    return classList;
+  }
+  
+
+}

Added: incubator/chukwa/trunk/src/test/hbase-site.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/hbase-site.xml?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/hbase-site.xml (added)
+++ incubator/chukwa/trunk/src/test/hbase-site.xml Sat Aug 21 21:17:04 2010
@@ -0,0 +1,137 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.regionserver.msginterval</name>
+    <value>1000</value>
+    <description>Interval between messages from the RegionServer to HMaster
+    in milliseconds.  Default is 15. Set this value low if you want unit
+    tests to be responsive.
+    </description>
+  </property>
+  <property>
+    <name>hbase.client.pause</name>
+    <value>5000</value>
+    <description>General client pause value.  Used mostly as value to wait
+    before running a retry of a failed get, region lookup, etc.</description>
+  </property>
+  <property>
+    <name>hbase.master.meta.thread.rescanfrequency</name>
+    <value>10000</value>
+    <description>How long the HMaster sleeps (in milliseconds) between scans of
+    the root and meta tables.
+    </description>
+  </property>
+  <property>
+    <name>hbase.server.thread.wakefrequency</name>
+    <value>1000</value>
+    <description>Time to sleep in between searches for work (in milliseconds).
+    Used as sleep interval by service threads such as META scanner and log roller.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>5</value>
+    <description>Count of RPC Server instances spun up on RegionServers
+    Same property is used by the HMaster for count of master handlers.
+    Default is 10.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.period</name>
+    <value>6000</value>
+    <description>Length of time the master will wait before timing out a region
+    server lease. Since region servers report in every second (see above), this
+    value has been reduced so that the master will notice a dead region server
+    sooner. The default is 30 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase master web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase regionserver web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port.auto</name>
+    <value>true</value>
+    <description>Info server auto port bind. Enables automatic port
+    search if hbase.regionserver.info.port is already in use.
+    Enabled for testing to run multiple tests on one machine.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.thread.wakefrequency</name>
+    <value>3000</value>
+    <description>The interval between checks for expired region server leases.
+    This value has been reduced due to the other reduced values above so that
+    the master will notice a dead region server sooner. The default is 15 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>10000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.safemode</name>
+    <value>false</value>
+    <description>
+    Turn on/off safe mode in region server. Always on for production, always off
+    for tests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>67108864</value>
+    <description>
+    Maximum desired file size for an HRegion.  If filesize exceeds
+    value + (value / 2), the HRegion is split in two.  Default: 256M.
+
+    Keep the maximum filesize small so we split more often in tests.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.log.dir</name>
+    <value>${user.dir}/../logs</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>21810</value>
+    <description>Property from ZooKeeper's config zoo.cfg.
+    The port at which the clients will connect.
+    </description>
+  </property>
+</configuration>

Added: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java (added)
+++ incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+
+public class TestHBaseWriter extends TestCase{
+  static Logger log = Logger.getLogger(TestHBaseWriter.class);
+  private HBaseTestingUtility util;
+  private HBaseWriter hbw;
+  private Configuration conf;
+  private byte[] columnFamily = Bytes.toBytes("TestColumnFamily");
+  private byte[] qualifier = Bytes.toBytes("Key");
+  private byte[] expectedValue = Bytes.toBytes("Value");
+
+  private byte[] table = Bytes.toBytes("Test");
+  private byte[] test = Bytes.toBytes("1234567890 Key Value");
+  private ChukwaConfiguration cc;
+  long timestamp = 1234567890;
+  
+  public TestHBaseWriter() {
+    cc = new ChukwaConfiguration();
+
+    conf = HBaseConfiguration.create();
+    conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+    try {
+      util = new HBaseTestingUtility(conf);
+      util.startMiniZKCluster();
+      util.getConfiguration().setBoolean("dfs.support.append", true);
+      util.startMiniCluster(2);
+      HTableDescriptor desc = new HTableDescriptor();
+      HColumnDescriptor family = new HColumnDescriptor(columnFamily);
+      desc.setName(table);
+      desc.addFamily(family);
+      util.getHBaseAdmin().createTable(desc);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+  
+  public void setup() {
+    
+  }
+  
+  public void tearDown() {
+    
+  }
+  
+  public void testWriters() {
+    ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+    chunks.add(new ChunkImpl("TextParser", "name", timestamp, test, null));      
+    try {      
+      cc.set("hbase.demux.package", "org.apache.chukwa.datacollection.writer.test.demux");
+      cc.set("TextParser","org.apache.hadoop.chukwa.datacollection.writer.test.demux.TextParser");
+      conf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
+      hbw = new HBaseWriter(cc, conf);
+      hbw.init(cc);
+      if(hbw.add(chunks)!=ChukwaWriter.COMMIT_OK) {
+        Assert.fail("Commit status is not OK.");
+      }
+      HTable testTable = new HTable(table);
+      ResultScanner scanner = testTable.getScanner(columnFamily, qualifier);
+      for(Result res : scanner) {
+        Assert.assertEquals(new String(expectedValue), new String(res.getValue(columnFamily, qualifier)));
+      }
+      // Cleanup and return
+      scanner.close();
+      // Compare data in Hbase with generated chunks
+      util.shutdownMiniCluster();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+}

Added: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/demux/TextParser.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/demux/TextParser.java?rev=987831&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/demux/TextParser.java (added)
+++ incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/test/demux/TextParser.java Sat Aug 21 21:17:04 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.test.demux;
+
+import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.AbstractProcessor;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+@Table(name="Test",columnFamily="TestColumnFamily")
+public class TextParser extends AbstractProcessor {
+  static Logger log = Logger.getLogger(TextParser.class);
+  public static final String reduceType = "TestColumnFamily";
+  public final String recordType = this.getClass().getName();
+
+  public TextParser() {
+  }
+
+  public String getDataType() {
+    return recordType;
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+    ChukwaRecord record = new ChukwaRecord();
+    String[] parts = recordEntry.split("\\s");
+    record.add("timestamp", parts[0]);
+    record.add(parts[1], parts[2]);
+    key.setKey(parts[0]+"/"+parts[1]+"/"+parts[0]);
+    long timestamp = Long.parseLong(parts[0]);
+    this.buildGenericRecord(record, null, timestamp, reduceType);
+    output.collect(key, record);    
+  }
+}
\ No newline at end of file



Mime
View raw message