chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r763513 [1/2] - in /hadoop/chukwa/trunk: ./ bin/ conf/ lib/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/extraction/ src/java/org/apache/hadoop/chukwa/extraction/archive/ src/java/org/apache/hadoop/chukwa/extrac...
Date Thu, 09 Apr 2009 04:18:37 GMT
Author: eyang
Date: Thu Apr  9 04:18:36 2009
New Revision: 763513

URL: http://svn.apache.org/viewvc?rev=763513&view=rev
Log:
CHUKWA-26. * DemuxManager, ArchiveManager and PostProcessorManager are now a single daemon process each.
           * Each one working independently from others, as soon as something is available.
           * Start-data-processor is now using those new daemons instead of pocessSink.sh
           * Daily will process a daily compaction only when all hourly would have been done.
           * Demux is now able to send NSCA commands to Nagios (Jerome Boulon via Eric Yang)


Added:
    hadoop/chukwa/trunk/bin/archive.sh
    hadoop/chukwa/trunk/bin/demux.sh
    hadoop/chukwa/trunk/bin/postProcess.sh
    hadoop/chukwa/trunk/lib/NagiosAppender-1.5.0.jar   (with props)
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/NagiosHelper.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxManager.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/bin/dailyRolling.sh
    hadoop/chukwa/trunk/bin/hourlyRolling.sh
    hadoop/chukwa/trunk/bin/start-data-processors.sh
    hadoop/chukwa/trunk/bin/stop-data-processors.sh
    hadoop/chukwa/trunk/bin/watchdog.sh
    hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Apr  9 04:18:36 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-78.  Added down sample SQL aggregation for job data, task data and utilization data. (Eric Yang)
+
     CHUKWA-17. Collect PS output every 10 minutes and parse into key/value pairs sequence file. (Cheng Zhang via Eric Yang)
 
     CHUKWA-55. Aggregate data from HDFS bytes usage, and Mapreduce Job slot time to compute user usage of the cluster. (Eric Yang)
@@ -30,6 +32,18 @@
 
   IMPROVEMENTS
 
+    CHUKWA-26.  * DemuxManager, ArchiveManager and PostProcessorManager are now a single daemon process each.
+                * Each one working independently from others, as soon as something is available.
+                * Start-data-processor is now using those new daemons instead of pocessSink.sh
+                * Daily will process a daily compaction only when all hourly would have been done.
+                * Demux is now able to send NSCA commands to Nagios. (Jerome Boulon via Eric Yang)
+
+    CHUKWA-26.  * DemuxManager, ArchiveManager and PostProcessorManager are now a single daemon process each.
+                * Each one working independently from others, as soon as something is available.
+                * Start-data-processor is now using those new daemons instead of pocessSink.sh
+                * Daily will process a daily compaction only when all hourly would have been done.
+                * Demux is now able to send NSCA commands to Nagios. (Jerome Boulon via Eric Yang)
+
     CHUKWA-81.  Fix file descriptor leak.  (asrabkin)
 
     CHUKWA-29.  Added TaskTracker and DataNode client trace log parser and database loader.  (Chris Douglas via Eric Yang)

Added: hadoop/chukwa/trunk/bin/archive.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/archive.sh?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/bin/archive.sh (added)
+++ hadoop/chukwa/trunk/bin/archive.sh Thu Apr  9 04:18:36 2009
@@ -0,0 +1,42 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pid=$$
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/chukwa-config.sh
+
+if [ "$CHUKWA_IDENT_STRING" = "" ]; then
+  export CHUKWA_IDENT_STRING="$USER"
+fi
+
+
+
+trap 'rm -f $CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-archive.sh.pid ${CHUKWA_PID_DIR}/ArchiveManager.pid; exit 0' 1 2 15
+echo "${pid}" > "$CHUKWA_PID_DIR/archive.pid"
+
+
+if [ "X$1" = "Xstop" ]; then
+  echo -n "Shutting down postProcess.sh..."
+  kill -TERM `cat ${CHUKWA_PID_DIR}/ArchiveManager.pid`
+  echo "done"
+  exit 0
+fi
+
+# run ArchiveManager
+${JAVA_HOME}/bin/java -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DAPP=archiveManager -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_HOME}/conf:${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools} org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveManager
+

Modified: hadoop/chukwa/trunk/bin/dailyRolling.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/dailyRolling.sh?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/dailyRolling.sh (original)
+++ hadoop/chukwa/trunk/bin/dailyRolling.sh Thu Apr  9 04:18:36 2009
@@ -20,15 +20,17 @@
 bin=`cd "$bin"; pwd`
 . "$bin"/chukwa-config.sh
 
-HADOOP_OPTS="$HADOOP_OPTS -DAPP=dailyRolling -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} "
-export HADOOP_OPTS
-export HADOOP_CONF_DIR
-HADOOP_CMDE="${HADOOP_HOME}/bin/hadoop "
 
-  $HADOOP_CMDE jar ${CHUKWA_CORE} org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence true deleteRawdata true
+pidFile=${CHUKWA_PID_DIR}/DailyChukwaRecordRolling.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep DailyChukwaRecordRolling | grep -v grep | grep -o "[^ ].*" | grep ${pid} | wc -l`
+  if [ $ChildPIDRunningStatus -gt 0 ]; then
+      exit -1
+  fi
+fi
 
-  previousDay=`date --date="2 day ago" +%Y%m%d`
-  #previousDay=`date -v -2d +%Y%m%d`
-  echo "deleting /chukwa/postprocess/srcSink${previousDay}_*"
-  $HADOOP_CMDE dfs -rmr "/chukwa/postprocess/srcSink${previousDay}_*"
+rm -f ${pidFile}
+
+${JAVA_HOME}/bin/java -DAPP=dailyRolling -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_PID_DIR=${CHUKWA_PID_DIR} -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence true deleteRawdata true
 

Added: hadoop/chukwa/trunk/bin/demux.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/demux.sh?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/bin/demux.sh (added)
+++ hadoop/chukwa/trunk/bin/demux.sh Thu Apr  9 04:18:36 2009
@@ -0,0 +1,78 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pid=$$
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/chukwa-config.sh
+
+if [ "$CHUKWA_IDENT_STRING" = "" ]; then
+  export CHUKWA_IDENT_STRING="$USER"
+fi
+
+
+trap 'remove_cron;rm -f $CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-demux.sh.pid ${CHUKWA_PID_DIR}/DemuxManager.pid; exit 0' 1 2 15
+echo "${pid}" > "$CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-demux.sh.pid"
+
+
+function remove_cron {
+    mkdir -p ${CHUKWA_HOME}/var/tmp >&/dev/null
+    crontab -l | grep -v ${CHUKWA_HOME}/bin/hourlyRolling.sh > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+    cat /tmp/cron.${CURRENT_DATE} | grep -v ${CHUKWA_HOME}/bin/dailyRolling.sh > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2
+    crontab ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2
+    rm -f ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+    rm -f ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2
+}
+
+function add_cron {
+    mkdir -p ${CHUKWA_HOME}/var/tmp >&/dev/null
+    crontab -l > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+    crontest=$?
+
+    if [ "X${crontest}" != "X0" ]; then
+      cat > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} << CRON
+16 * * * * ${CHUKWA_HOME}/bin/hourlyRolling.sh --config ${CHUKWA_CONF_DIR} >& ${CHUKWA_LOG_DIR}/hourly.log
+30 1 * * * ${CHUKWA_HOME}/bin/dailyRolling.sh --config ${CHUKWA_CONF_DIR} >& ${CHUKWA_LOG_DIR}/daily.log
+CRON
+    else
+      grep -v "${CHUKWA_HOME}/bin/hourlyRolling.sh" ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}  | grep -v "${CHUKWA_HOME}/bin/dailyRolling.sh" > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2
+      mv ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2 ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+      cat >> ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} << CRON
+16 * * * * ${CHUKWA_HOME}/bin/hourlyRolling.sh --config ${CHUKWA_CONF_DIR} >& ${CHUKWA_LOG_DIR}/hourly.log
+30 1 * * * ${CHUKWA_HOME}/bin/dailyRolling.sh --config ${CHUKWA_CONF_DIR} >& ${CHUKWA_LOG_DIR}/daily.log
+CRON
+    fi
+
+    # save crontab
+    echo -n "Registering cron jobs.."
+    crontab ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} > /dev/null 2>&1
+    rm -f ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+    echo "done"
+}
+
+if [ "X$1" = "Xstop" ]; then
+  echo -n "Shutting down demux.sh..."
+  kill -TERM `cat ${CHUKWA_PID_DIR}/DemuxManager.pid`
+  echo "done"
+  exit 0
+fi
+
+
+ add_cron
+ 
+ # Run Demux
+ ${JAVA_HOME}/bin/java -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DAPP=demux -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.extraction.demux.DemuxManager 

Modified: hadoop/chukwa/trunk/bin/hourlyRolling.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/hourlyRolling.sh?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/hourlyRolling.sh (original)
+++ hadoop/chukwa/trunk/bin/hourlyRolling.sh Thu Apr  9 04:18:36 2009
@@ -20,11 +20,16 @@
 bin=`cd "$bin"; pwd`
 . "$bin"/chukwa-config.sh
 
-HADOOP_OPTS="$HADOOP_OPTS -DAPP=hourlyRolling -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} "
-export HADOOP_OPTS
-export HADOOP_CONF_DIR
-HADOOP_CMDE="${HADOOP_HOME}/bin/hadoop "
+pidFile=${CHUKWA_PID_DIR}/HourlyChukwaRecordRolling.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep HourlyChukwaRecordRolling | grep -v grep | grep -o "[^ ].*" | grep ${pid} | wc -l`
+  if [ $ChildPIDRunningStatus -gt 0 ]; then
+      exit -1
+  fi
+fi
 
-  $HADOOP_CMDE jar ${CHUKWA_CORE} org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence true deleteRawdata true
+rm -f ${pidFile}
 
+${JAVA_HOME}/bin/java -DAPP=hourlyRolling -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_PID_DIR=${CHUKWA_PID_DIR} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence true deleteRawdata true
 

Added: hadoop/chukwa/trunk/bin/postProcess.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/postProcess.sh?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/bin/postProcess.sh (added)
+++ hadoop/chukwa/trunk/bin/postProcess.sh Thu Apr  9 04:18:36 2009
@@ -0,0 +1,42 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pid=$$
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/chukwa-config.sh
+
+if [ "$CHUKWA_IDENT_STRING" = "" ]; then
+  export CHUKWA_IDENT_STRING="$USER"
+fi
+
+
+
+trap 'rm -f $CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-postProcess.sh.pid ${CHUKWA_PID_DIR}/PostProcessorManager.pid; exit 0' 1 2 15
+echo "${pid}" > "$CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-postProcess.sh.pid"
+
+
+if [ "X$1" = "Xstop" ]; then
+  echo -n "Shutting down postProcess.sh..."
+  kill -TERM `cat ${CHUKWA_PID_DIR}/PostProcessorManager.pid`
+  echo "done"
+  exit 0
+fi
+
+# run PosProcessorManager
+${JAVA_HOME}/bin/java -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DAPP=postProcess -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.extraction.demux.PostProcessorManager 
+

Modified: hadoop/chukwa/trunk/bin/start-data-processors.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/start-data-processors.sh?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/start-data-processors.sh (original)
+++ hadoop/chukwa/trunk/bin/start-data-processors.sh Thu Apr  9 04:18:36 2009
@@ -33,7 +33,11 @@
 fi 
 
 # start data processors
-"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR --watchdog start processSinkFiles.sh watchdog
+#"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR --watchdog start processSinkFiles.sh watchdog
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR --watchdog start archive.sh watchdog
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR --watchdog start demux.sh watchdog
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR --watchdog start postProcess.sh watchdog
+
 
 # start database admin script
 "$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR start dbAdmin.sh

Modified: hadoop/chukwa/trunk/bin/stop-data-processors.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/stop-data-processors.sh?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/stop-data-processors.sh (original)
+++ hadoop/chukwa/trunk/bin/stop-data-processors.sh Thu Apr  9 04:18:36 2009
@@ -47,6 +47,86 @@
   echo " no $pidFile"
 fi
 
+
+# stop demux.sh
+pidFile=$CHUKWA_PID_DIR/DemuxManager.pid
+if [ -f $pidFile ]; then
+   echo -n "Shutting down demux.sh.."
+   DEMUX_PID=`head ${pidFile}`
+   kill -TERM ${DEMUX_PID}
+   for i in 1 2 5; do
+       test_pid=`ps ax | grep ${DEMUX_PID} | grep -v grep | grep DemuxManager | wc -l`
+       if [ $test_pid -ge 1 ]; then
+           sleep $i
+           kill -TERM ${DEMUX_PID}
+       else
+           break
+       fi
+   done
+   test_pid=`ps ax | grep ${DEMUX_PID} | grep -v grep | grep DemuxManager | wc -l`
+   if [ $test_pid -ge 1 ]; then
+       kill -9 ${DEMUX_PID} &>/dev/null
+   fi
+   rm -f ${pidFile}
+   rm -f $CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-demux.sh.pid
+   echo "done"
+else
+  echo " no $pidFile"
+fi
+
+
+# stop postProcess.sh
+pidFile=$CHUKWA_PID_DIR/PostProcessorManager.pid
+if [ -f $pidFile ]; then
+   echo -n "Shutting down postProcess.sh ..."
+   POST_PROCESS_PID=`head ${pidFile}`
+   kill -TERM ${POST_PROCESS_PID}
+   for i in 1 2 5; do
+       test_pid=`ps ax | grep ${POST_PROCESS_PID} | grep -v grep | grep PostProcessorManager | wc -l`
+       if [ $test_pid -ge 1 ]; then
+           sleep $i
+           kill -TERM ${POST_PROCESS_PID}
+       else
+           break
+       fi
+   done
+   test_pid=`ps ax | grep ${POST_PROCESS_PID} | grep -v grep | grep PostProcessorManager | wc -l`
+   if [ $test_pid -ge 1 ]; then
+       kill -9 ${POST_PROCESS_PID} &>/dev/null
+   fi
+   rm -f ${pidFile}
+   rm -f $CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-postProcess.sh.pid
+   echo "done"
+else
+  echo " no $pidFile"
+fi
+
+# stop archive.sh
+pidFile=$CHUKWA_PID_DIR/ArchiveManager.pid
+if [ -f $pidFile ]; then
+   echo -n "Shutting down archive.sh ..."
+   POST_PROCESS_PID=`head ${pidFile}`
+   kill -TERM ${POST_PROCESS_PID}
+   for i in 1 2 5; do
+       test_pid=`ps ax | grep ${POST_PROCESS_PID} | grep -v grep | grep ChukwaArchiveManager | wc -l`
+       if [ $test_pid -ge 1 ]; then
+           sleep $i
+           kill -TERM ${POST_PROCESS_PID}
+       else
+           break
+       fi
+   done
+   test_pid=`ps ax | grep ${POST_PROCESS_PID} | grep -v grep | grep ChukwaArchiveManager | wc -l`
+   if [ $test_pid -ge 1 ]; then
+       kill -9 ${POST_PROCESS_PID} &>/dev/null
+   fi
+   rm -f ${pidFile}
+   rm -f $CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-postProcess.sh.pid
+   echo "done"
+else
+  echo " no $pidFile"
+fi
+
 # stop dbAdmin.sh
 pidFile=$CHUKWA_PID_DIR/dbAdmin.pid
 if [ -f $pidFile ]; then  

Modified: hadoop/chukwa/trunk/bin/watchdog.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/watchdog.sh?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/watchdog.sh (original)
+++ hadoop/chukwa/trunk/bin/watchdog.sh Thu Apr  9 04:18:36 2009
@@ -41,6 +41,45 @@
 #  fi 
 #fi
 
+
+# monitor demux.sh
+pidFile=${CHUKWA_PID_DIR}/DemuxManager.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep DemuxManager | grep -v grep | grep -o "[^ ].*" | grep ${pid} | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: pid file exists, but process missing.  Restarting demux.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start demux.sh &
+  fi
+fi
+
+# monitor postProcess.sh
+pidFile=$CHUKWA_PID_DIR/PostProcessorManager.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep PostProcessorManager | grep -v grep | grep -o "[^ ].*" | grep ${pid} | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: pid file exists, but process missing.  Restarting postProcess.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start postProcess.sh &
+  fi
+fi
+
+# monitor archive.sh
+pidFile=${CHUKWA_PID_DIR}/ArchiveManager.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep ChukwaArchiveManager | grep -v grep | grep -o "[^ ].*" | grep ${pid} | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: pid file exists, but process missing.  Restarting postProcess.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start archive.sh &
+  fi
+fi
+
+
+
 # monitor collector
 pidFile=$CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-jettyCollector.sh.pid
 if [ -f $pidFile ]; then

Modified: hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template (original)
+++ hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template Thu Apr  9 04:18:36 2009
@@ -6,6 +6,13 @@
 <configuration>
 
 
+
+  <property>
+    <name>postProcessorManager.dbloader.ds</name>
+    <value>SystemMetrics,Df,Hadoop_dfs,Hadoop_jvm,Hadoop_mapred,Hadoop_rpc,MSSRGraph,MRJobCounters,NodeActivity,HodJob,HodMachine,Hadoop_dfs_FSDirectory,Hadoop_dfs_FSNamesystem,Hadoop_dfs_datanode,Hadoop_dfs_namenode,Hadoop_jvm_metrics,Hadoop_mapred_job,Hadoop_mapred_jobtracker,Hadoop_mapred_shuffleOutput,Hadoop_mapred_tasktracker,Hadoop_rpc_metrics,JobData,TaskData,HDFSUsage</value>
+    <description>Reduce count </description>
+  </property>
+  
   <property>
     <name>chukwaArchiveBuilder.reduceCount</name>
     <value>5</value>

Added: hadoop/chukwa/trunk/lib/NagiosAppender-1.5.0.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/lib/NagiosAppender-1.5.0.jar?rev=763513&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/chukwa/trunk/lib/NagiosAppender-1.5.0.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java Thu Apr  9 04:18:36 2009
@@ -56,6 +56,9 @@
       super.addResource(new Path(chukwaConf + "/chukwa-agent-conf.xml"));
       log.debug("added chukwa-agent-conf.xml to ChukwaConfiguration");
 
+      super.addResource(new Path(chukwaConf + "/chukwa-demux-conf.xml"));
+      log.debug("added chukwa-demux-conf.xml to ChukwaConfiguration");
+      
       super.addResource(new Path(chukwaConf + "/hadoop-site.xml"));
       log.debug("added hadoop-site.xml to ChukwaConfiguration");
 

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Thu Apr  9 04:18:36 2009
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction;
+
+public interface CHUKWA_CONSTANT {
+
+  public static final String WRITER_HDFS_FILESYSTEM_FIELD = "writer.hdfs.filesystem";
+  public static final String CHUKWA_ROOT_DIR_FIELD = "chukwaRootDir";
+  public static final String CHUKWA_ROOT_REPOS_DIR_FIELD = "chukwaRootReposDir";
+
+  public static final String CHUKWA_ARCHIVE_DIR_FIELD = "chukwaArchiveDir";
+  public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir";
+  public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir";
+  
+  public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost";
+  public static final String CHUKWA_NAGIOS_PORT_FIELD = "demux.nagiosPort";
+  public static final String CHUKWA_REPORTING_HOST_FIELD = "demux.reportingHost4Nagios";
+  
+  
+  public static final String CHUKWA_DEMUX_REDUCER_COUNT_FIELD     = "demux.reducerCount";
+  
+  public static final String DEFAULT_DEMUX_ROOT_DIR_NAME          = "/chukwa/";
+  public static final String DEFAULT_REPOS_DIR_NAME               = "repos/";
+  public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME  = "postProcess/";
+  public static final String DEFAULT_CHUKWA_LOGS_DIR_NAME         = "logs/";
+  
+  public static final String DEFAULT_DEMUX_PROCESSING_DIR_NAME    = "demuxProcessing/";
+  public static final String DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME     = "mrOutput/";
+  public static final String DEFAULT_DEMUX_MR_INPUT_DIR_NAME      = "mrInput/";
+  public static final String DEFAULT_DEMUX_IN_ERROR_DIR_NAME      = "inError/";
+  
+  public static final String DEFAULT_CHUKWA_DATASINK_DIR_NAME     = "dataSinkArchives/";
+  public static final String DEFAULT_FINAL_ARCHIVES               = "finalArchives/";
+  
+  public static final String DEFAULT_ARCHIVES_PROCESSING_DIR_NAME    = "archivesProcessing/";
+  public static final String DEFAULT_ARCHIVES_MR_OUTPUT_DIR_NAME     = "mrOutput/";
+  public static final String DEFAULT_ARCHIVES_MR_INPUT_DIR_NAME      = "mrInput/";
+  public static final String DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME      = "inError/";
+  
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java Thu Apr  9 04:18:36 2009
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.chukwa.util.PidFile;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
+  static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
+  static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
+  
+  static final  int ONE_HOUR = 60 * 60 * 1000;
+  static final int ONE_DAY = 24*ONE_HOUR;
+  static final int MAX_FILES = 500;
+  
+  protected ChukwaConfiguration conf = null;
+  protected FileSystem fs = null;
+  protected boolean isRunning = true;
+  
+  public ChukwaArchiveManager() throws Exception { 
+    conf = new ChukwaConfiguration();
+    init();
+  }
+
+  public ChukwaArchiveManager(ChukwaConfiguration conf) throws Exception {
+    conf = new ChukwaConfiguration();
+    init();
+  }
+  
+  protected void init() throws IOException, URISyntaxException {
+    String fsName = conf.get(WRITER_HDFS_FILESYSTEM_FIELD);
+    fs = FileSystem.get(new URI(fsName), conf);
+  }
+
+  public static void main(String[] args) throws Exception {
+    PidFile pFile = new PidFile("ArchiveManager");
+    Runtime.getRuntime().addShutdownHook(pFile);
+    
+    ChukwaArchiveManager manager = new ChukwaArchiveManager();
+    manager.start();
+  }
+
+  public void shutdown() {
+    this.isRunning = false;
+  }
+  
+  public void start() throws Exception {
+    
+    String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_DEMUX_ROOT_DIR_NAME);
+    if ( ! chukwaRootDir.endsWith("/") ) {
+      chukwaRootDir += "/";
+    }
+    log.info("chukwaRootDir:" + chukwaRootDir);
+    
+    String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
+    if ( ! archiveRootDir.endsWith("/") ) {
+      archiveRootDir += "/";
+    }
+    log.info("archiveDir:" + archiveRootDir);
+    Path pArchiveRootDir = new Path(archiveRootDir);
+    setup(pArchiveRootDir);
+    
+    String archivesRootProcessingDir = chukwaRootDir + DEFAULT_ARCHIVES_PROCESSING_DIR_NAME;
+    // String archivesErrorDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME;
+    String archivesMRInputDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_MR_INPUT_DIR_NAME;
+    String archivesMROutputDir = archivesRootProcessingDir+ DEFAULT_ARCHIVES_MR_OUTPUT_DIR_NAME;
+    String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
+
+
+    
+    Path pDailyRawArchivesInput = new Path(archiveRootDir);
+    Path pArchivesMRInputDir = new Path(archivesMRInputDir);
+    Path pArchivesRootProcessingDir = new Path(archivesRootProcessingDir);
+    Path pFinalArchiveOutput =  new Path(finalArchiveOutput);
+    
+    
+    if (!archivesMRInputDir.endsWith("/")) {
+      archivesMRInputDir +="/";
+    }
+    setup( pArchivesRootProcessingDir );
+    setup( pDailyRawArchivesInput );
+    setup( pFinalArchiveOutput );
+    
+    int errorCount = 0;
+    
+    long lastRun = 0l;
+    
+    while (isRunning) {
+      try {
+        
+        if (errorCount >= 4 ) {
+          // it's better to exit, Watchdog will re-start it
+          log.warn("Too many error - bail out!");
+          System.exit(-1);
+        }
+        // /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
+        //  to
+        // /chukwa/archives/final/<YYYYMMDD>_<TS>
+        
+        if (fs.exists(pArchivesMRInputDir)) {
+          FileStatus[] days = fs.listStatus(pArchivesMRInputDir);
+          if (days.length > 0) {
+            log.info("reprocessing current Archive input" +  days[0].getPath());
+            
+            runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);  
+            continue; 
+          }
+        }
+        
+        
+        log.info("Raw Archive dir:" + pDailyRawArchivesInput);
+        long now = System.currentTimeMillis();
+        int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
+        FileStatus[] daysInRawArchiveDir = fs.listStatus(pDailyRawArchivesInput);
+       
+        // We don't want to process DataSink file more than once every 2 hours
+        // for current day
+        if (daysInRawArchiveDir.length == 1 ) {
+          int workingDay = Integer.parseInt(daysInRawArchiveDir[0].getPath().getName());
+          long nextRun = lastRun + (2*ONE_HOUR) - (1*60*1000);// 2h -1min
+          if (workingDay == currentDay && now < nextRun) {
+            log.info("lastRun < 2 hours so skip archive for now, going to sleep for 30 minutes, currentDate is:" + new java.util.Date());
+            Thread.sleep(30 * 60 * 1000);
+            continue;
+          }
+        }
+        
+        String dayArchivesMRInputDir = null;
+        for (FileStatus fsDay : daysInRawArchiveDir) {
+          dayArchivesMRInputDir = archivesMRInputDir + fsDay.getPath().getName() + "/";
+          processDay(fsDay, dayArchivesMRInputDir,archivesMROutputDir, finalArchiveOutput);
+          lastRun = now;
+        }
+        
+      }catch (Throwable e) {
+        errorCount ++;
+        e.printStackTrace();
+        log.warn(e);
+      }
+      
+    }
+    
+  }
+  
+  public void processDay(FileStatus fsDay, String archivesMRInputDir,
+      String archivesMROutputDir,String finalArchiveOutput) throws Exception {
+    FileStatus[] dataSinkDirsInRawArchiveDir = fs.listStatus(fsDay.getPath());
+    long now = System.currentTimeMillis();
+    
+    int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
+    int workingDay = Integer.parseInt(fsDay.getPath().getName());
+    
+    long oneHourAgo = now -  ONE_HOUR;
+    if (dataSinkDirsInRawArchiveDir.length == 0 && workingDay < currentDay) {
+      fs.delete(fsDay.getPath(),false);
+      log.info("deleting raw dataSink dir for day:" + fsDay.getPath().getName());
+      return;
+    }
+    
+    int fileCount = 0;
+    for (FileStatus fsDataSinkDir : dataSinkDirsInRawArchiveDir) {
+      long modificationDate = fsDataSinkDir.getModificationTime();
+      if (modificationDate < oneHourAgo) {
+        log.info("processDay,modificationDate:" + modificationDate +", adding: " + fsDataSinkDir.getPath() );
+        fileCount += fs.listStatus(fsDataSinkDir.getPath()).length;
+        moveDataSinkFilesToArchiveMrInput(fsDataSinkDir,archivesMRInputDir);
+        // process no more than MAX_FILES directories
+        if (fileCount >= MAX_FILES) {
+          log.info("processDay, reach capacity");
+          runArchive(archivesMRInputDir,archivesMROutputDir,finalArchiveOutput);  
+          fileCount = 0;
+        } else {
+          log.info("processDay,modificationDate:" + modificationDate +", skipping: " + fsDataSinkDir.getPath() );
+        }
+      }
+    }    
+  }
+  
+  public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
+      String finalArchiveOutput) throws Exception {
+    String[] args = new String[3];
+    args[0] = "Stream";
+    args[1] = archivesMRInputDir + "*/*.done" ;
+    args[2] = archivesMROutputDir;
+    
+    Path pArchivesMRInputDir = new Path(archivesMRInputDir);
+    Path pArchivesMROutputDir = new Path(archivesMROutputDir);
+
+    
+    if (fs.exists(pArchivesMROutputDir)) {
+      log.warn("Deleteing mroutput dir for archive ...");
+      fs.delete(pArchivesMROutputDir, true);
+    }
+    
+    log.info("ChukwaArchiveManager processing :" + args[1] + " going to output to " + args[2] );
+    int res = ToolRunner.run(this.conf, new ChukwaArchiveBuilder(),args);
+    log.info("Archive result: " + res);
+    if (res != 0) {
+      throw new Exception("Archive result != 0");
+    }
+   
+    if (!finalArchiveOutput.endsWith("/")) {
+      finalArchiveOutput +="/";
+    }
+    String day = pArchivesMRInputDir.getName();
+    finalArchiveOutput += day;
+    Path pDay = new Path(finalArchiveOutput);
+    setup(pDay);
+    
+    finalArchiveOutput += "/archive_" + System.currentTimeMillis();
+    Path pFinalArchiveOutput = new Path(finalArchiveOutput);
+    
+    log.info("Final move: moving " + pArchivesMROutputDir + " to " + pFinalArchiveOutput);
+    
+    if (fs.rename(pArchivesMROutputDir, pFinalArchiveOutput ) ) {
+      log.info("deleting " + pArchivesMRInputDir);
+      fs.delete(pArchivesMRInputDir, true);
+    } else {
+      log.warn("move to final archive folder failed!");
+    }
+    
+
+    
+  }
+  
+  public void moveDataSinkFilesToArchiveMrInput(FileStatus fsDataSinkDir,
+      String archivesMRInputDir) throws IOException {
+    
+    if (!archivesMRInputDir.endsWith("/")) {
+      archivesMRInputDir +="/";
+    }
+    
+    Path pArchivesMRInputDir = new Path(archivesMRInputDir);
+    setup(pArchivesMRInputDir);
+    fs.rename(fsDataSinkDir.getPath(), pArchivesMRInputDir);
+    log.info("moving " + fsDataSinkDir.getPath() + " to " + pArchivesMRInputDir);
+  }
+  
+  /**
+   * Create directory if !exists
+   * @param directory
+   * @throws IOException
+   */
+  protected void setup(Path directory) throws IOException {
+     if ( ! fs.exists(directory)) {
+       fs.mkdirs(directory);
+     }
+  }
+ 
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java Thu Apr  9 04:18:36 2009
@@ -31,17 +31,24 @@
 
     System.out.println("Input directory:" + args[0]);
 
+    HashMap<String, String> dataSources = new HashMap<String, String>();
     for (int i = 1; i < args.length; i++) {
-      hashDatasources.put(args[i], "");
+      dataSources.put(args[i], "");
     }
 
     conf = new ChukwaConfiguration();
     fs = FileSystem.get(conf);
-    Path demuxDir = new Path(args[0]);
+    loadData( fs,args[0],dataSources );
+  }
+
+  public static void loadData(FileSystem fs,String inputDirectory, HashMap<String, String> dataSources) throws IOException {
+    DatabaseLoader.hashDatasources = dataSources;
+    DatabaseLoader.fs = fs;
+    Path demuxDir = new Path(inputDirectory);
     FileStatus fstat = fs.getFileStatus(demuxDir);
 
     if (!fstat.isDir()) {
-      throw new IOException(args[0] + " is not a directory!");
+      throw new IOException(inputDirectory + " is not a directory!");
     } else {
       // cluster Directory
       FileStatus[] clusterDirectories = fs.listStatus(demuxDir);
@@ -66,8 +73,6 @@
           }
         }
       }
-
-      System.exit(0);
     }
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java Thu Apr  9 04:18:36 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.PidFile;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +38,7 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -63,9 +65,45 @@
     System.exit(-1);
   }
 
+  public static boolean hourlyRolling(String dailyStreamDirectory) {
+   
+    Path pHour = null;
+    try {
+      log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
+      
+      for (int i=0;i<24;i++) {
+        pHour = new Path(dailyStreamDirectory + "/" + i);
+        if (! fs.exists(pHour)) {
+          log.info("HourlyData is missing for:" + pHour);
+          continue;
+        } else {
+          FileStatus[] files = fs.listStatus(pHour);
+          boolean containsHourly = false;
+          for(FileStatus file: files) {
+            log.info("Debug checking" + file.getPath());
+            if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
+              containsHourly = true;
+              break;
+            }
+          }
+          if (containsHourly == false) {
+            log.info("HourlyDone is missing for : " + pHour);
+            return false;
+          }
+        }
+      }
+      return true;
+    }catch(Exception e) {
+      e.printStackTrace();
+      return false;
+    }
+  }
   public static void buildDailyFiles(String chukwaMainRepository,
       String tempDir, String rollingFolder, int workingDay) throws IOException {
     // process
+    
+    boolean alldone = true;
+    
     Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
     FileStatus[] clustersFS = fs.listStatus(dayPath);
     for (FileStatus clusterFs : clustersFS) {
@@ -78,16 +116,27 @@
         String dataSource = dataSourceFS.getPath().getName();
         // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
 
+
         // put the rotate flag
         fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
             + dataSource + "/" + workingDay + "/rotateDone"));
-
+        
+        if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
+          log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
+            + dataSource + "/" + workingDay );
+          alldone = false;
+          continue;
+        } 
+        
+        log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
+            + dataSource + "/" + workingDay + "/rotateDone");
+        
         // rotate
         // Merge
         String[] mergeArgs = new String[5];
         // input
         mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
-            + "/" + workingDay + "/[0-24]*/*.evt";
+            + "/" + workingDay + "/[0-9]*/*.evt";
         // temp dir
         mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
             + workingDay + "_" + System.currentTimeMillis();
@@ -95,7 +144,7 @@
         mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
             + "/" + workingDay;
         // final output fileName
-        mergeArgs[3] = dataSource + "_" + workingDay;
+        mergeArgs[3] = dataSource + "_DailyDone_"  + workingDay;
         // delete rolling directory
         mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
             + "/" + dataSource;
@@ -128,17 +177,23 @@
         } // End if (!rollInSequence)
 
         // Delete the processed dataSourceFS
-        FileUtil.fullyDelete(fs, dataSourceFS.getPath());
+          FileUtil.fullyDelete(fs, dataSourceFS.getPath());
 
       } // End for(FileStatus dataSourceFS : dataSourcesFS)
 
       // Delete the processed clusterFs
-      FileUtil.fullyDelete(fs, clusterFs.getPath());
+      if (alldone == true) {
+        FileUtil.fullyDelete(fs, clusterFs.getPath());
+      }
+      
 
     } // End for(FileStatus clusterFs : clustersFS)
 
     // Delete the processed dayPath
-    FileUtil.fullyDelete(fs, dayPath);
+    if (alldone == true) {
+      FileUtil.fullyDelete(fs, dayPath);
+    }
+    
   }
 
   /**
@@ -146,6 +201,10 @@
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
+    
+    PidFile pFile = new PidFile("DailyChukwaRecordRolling");
+    Runtime.getRuntime().addShutdownHook(pFile);
+    
     conf = new ChukwaConfiguration();
     String fsName = conf.get("writer.hdfs.filesystem");
     fs = FileSystem.get(new URI(fsName), conf);
@@ -195,9 +254,17 @@
     for (FileStatus dayFS : daysFS) {
       try {
         int workingDay = Integer.parseInt(dayFS.getPath().getName());
+        log.info("Daily working on :" + workingDay);
         if (workingDay < currentDay) {
-          buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
-              workingDay);
+          
+          try {
+            buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
+                workingDay);
+          } catch(Throwable e) {
+            e.printStackTrace();
+            log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay  ) ;
+          }
+          
         } // End if ( workingDay < currentDay)
       } // End Try workingDay =
         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
@@ -225,7 +292,7 @@
 
     FileInputFormat.setInputPaths(conf, args[0]);
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
+    conf.setJobPriority(JobPriority.LOW);
     JobClient.runJob(conf);
     return 0;
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Thu Apr  9 04:18:36 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -156,6 +157,7 @@
     conf.setOutputKeyClass(ChukwaRecordKey.class);
     conf.setOutputValueClass(ChukwaRecord.class);
     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
+    conf.setJobPriority(JobPriority.VERY_HIGH);
 
     List<String> other_args = new ArrayList<String>();
     for (int i = 0; i < args.length; ++i) {

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java Thu Apr  9 04:18:36 2009
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.chukwa.util.NagiosHelper;
+import org.apache.hadoop.chukwa.util.PidFile;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class DemuxManager implements CHUKWA_CONSTANT {  
+  static Logger log = Logger.getLogger(DemuxManager.class);
+
+  static int globalErrorcounter = 0;
+  
+  protected int ERROR_SLEEP_TIME = 60;
+  protected int NO_DATASINK_SLEEP_TIME = 20;
+
+  protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
+  protected int DEFAULT_REDUCER_COUNT = 8;
+  
+  protected int demuxReducerCount = 0; 
+  protected ChukwaConfiguration conf = null;
+  protected FileSystem fs = null;
+  protected int reprocess = 0;
+  protected boolean sendAlert = true;
+  
+  protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
+  protected volatile boolean isRunning = true;
+
+  final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
+    public boolean accept(Path file) {
+      return file.getName().endsWith(".done");
+    }     
+  };
+
+
+  public static void main(String[] args) throws Exception {
+    PidFile pFile = new PidFile("DemuxManager");
+    Runtime.getRuntime().addShutdownHook(pFile);
+    
+    DemuxManager manager = new DemuxManager();
+    manager.start();
+
+  }
+
+  public DemuxManager() throws Exception {
+    this.conf = new ChukwaConfiguration();
+    init();
+  }
+
+  public DemuxManager(ChukwaConfiguration conf) throws Exception {
+    this.conf = conf;
+    init();
+  }
+
+  protected void init() throws IOException, URISyntaxException {
+    String fsName = conf.get(WRITER_HDFS_FILESYSTEM_FIELD);
+    fs = FileSystem.get(new URI(fsName), conf);
+  }
+
+  public void shutdown() {
+    this.isRunning = false;
+  }
+
+
+  public int getReprocess() {
+    return reprocess;
+  }
+
+  /**
+   * Start the Demux Manager daemon
+   * @throws Exception
+   */
+  public void start() throws Exception {
+
+     String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_DEMUX_ROOT_DIR_NAME);
+     if ( ! chukwaRootDir.endsWith("/") ) {
+       chukwaRootDir += "/";
+     }
+     log.info("chukwaRootDir:" + chukwaRootDir);
+
+     String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
+     String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
+     String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
+     String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
+
+     String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
+     if ( ! dataSinkDir.endsWith("/") ) {
+       dataSinkDir += "/";
+     }
+     log.info("dataSinkDir:" + dataSinkDir);
+     
+     String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
+     if ( ! postProcessDir.endsWith("/") ) {
+       postProcessDir += "/";
+     }
+     log.info("postProcessDir:" + postProcessDir);
+     
+     String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
+     if ( ! archiveRootDir.endsWith("/") ) {
+       archiveRootDir += "/";
+     }
+     log.info("archiveRootDir:" + archiveRootDir);
+     
+     demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
+     log.info("demuxReducerCount:" + demuxReducerCount);
+     
+     String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
+     int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
+     String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
+     
+     log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:" 
+         + nagiosPort + ", reportingHost:" + reportingHost);
+     
+     
+     if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
+       sendAlert = false;
+       log.warn("Alerting is OFF");
+     }
+     
+     boolean demuxReady = false;
+
+     
+     while (isRunning) {
+       try {
+         demuxReady = false;
+
+         if (globalErrorcounter > 5) {
+           log.warn("==================\nToo many errors, Bail out!\n==================");
+           System.exit(-1);
+         }
+         
+         // Check for anomalies
+         if (checkDemuxOutputDir(demuxOutputDir) == true) {
+           // delete current demux output dir
+           if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
+             log.warn("Cannot delete an existing demux output directory!");
+             throw new IOException("Cannot move demuxOutput to postProcess!");
+           }
+           continue;
+         } else if (checkDemuxInputDir(demuxInputDir) == true) { // dataSink already there
+           reprocess++;
+
+           // Data has been processed more than 3 times ... move to InError directory
+           if (reprocess > 3) {
+             if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
+               log.warn("Cannot move dataSink files to DemuxErrorDir!");
+               throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
+             }
+             reprocess = 0;
+             continue;
+           }
+
+           log.error("Demux inputDir aready contains some dataSink files,"
+               + " going to reprocess, reprocessCount=" + reprocess);
+           demuxReady = true;
+         } else { // standard code path
+           reprocess = 0;
+           // Move new dataSink Files
+           if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
+             demuxReady = true; // if any are available
+           } else {
+             demuxReady = false; // if none
+           }
+         }
+
+         // start a new demux ?
+         if (demuxReady == true) {
+          boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
+               postProcessDir, archiveRootDir);
+          sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
+          
+         } else {
+           log.info("Demux not ready so going to sleep ...");
+           Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
+         }
+       }catch(Throwable e) {
+         log.warn(e);
+         globalErrorcounter ++;
+         sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
+         try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
+         catch (InterruptedException e1) {/*do nothing*/ }
+         init();
+       }
+     }
+   }
+
+
+   /**
+    * Send NSCA status to Nagios
+    * @param nagiosHost
+    * @param nagiosPort
+    * @param reportingHost
+    * @param demuxInErrorDir
+    * @param demuxStatus
+    * @param exception
+    */
+  protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
+        String demuxInErrorDir,boolean demuxStatus,String demuxException) {
+      
+     if (sendAlert == false) {
+       return;
+     }
+      
+     boolean demuxInErrorStatus = true;
+     String demuxInErrorMsg = "";
+     try {
+       Path pDemuxInErrorDir = new Path(demuxInErrorDir);
+       if ( fs.exists(pDemuxInErrorDir)) {
+         FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
+         if (demuxInErrorDirs.length == 0) {
+           demuxInErrorStatus = false;
+         }          
+       }
+     } catch (Throwable e) {
+       demuxInErrorMsg = e.getMessage();
+       log.warn(e);
+     }
+     
+     // send Demux status
+     if (demuxStatus == true) {
+       NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux OK",NagiosHelper.NAGIOS_OK);
+     } else {
+       NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
+     }
+     
+     // send DemuxInErrorStatus
+     if (demuxInErrorStatus == false) {
+       NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError OK",NagiosHelper.NAGIOS_OK);
+     } else {
+       NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
+     }
+     
+   }
+   
+   /**
+    * Process Data, i.e. 
+    * - run demux
+    * - move demux output to postProcessDir
+    * - move dataSink file to archiveDir
+    * 
+    * @param dataSinkDir
+    * @param demuxInputDir
+    * @param demuxOutputDir
+    * @param postProcessDir
+    * @param archiveDir
+    * @return True iff succeed
+    * @throws IOException
+    */
+    protected boolean processData(String dataSinkDir, String demuxInputDir,
+       String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
+
+     boolean demuxStatus = false;
+
+     long startTime = System.currentTimeMillis();
+     demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
+     log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
+
+     if (demuxStatus == false) {
+       log.warn("Demux failed!");
+     } else {
+
+       // Move demux output to postProcessDir 
+       if (checkDemuxOutputDir(demuxOutputDir)) {
+         if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
+           log.warn("Cannot move demuxOutput to postProcess! bail out!");
+           throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
+         } 
+       } else {
+         log.warn("Demux processing OK but no output");
+       }
+
+       // Move DataSink Files to archiveDir
+       if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
+         log.warn("Cannot move datasinkFile to archive! bail out!");
+         throw new IOException("Cannot move datasinkFile to archive! bail out!");
+       }
+     }
+     
+     return demuxStatus;
+   }
+
+
+   /**
+    * Submit and Run demux Job 
+    * @param demuxInputDir
+    * @param demuxOutputDir
+    * @return true id Demux succeed
+    */
+   protected boolean runDemux(String demuxInputDir, String demuxOutputDir) {
+     String[] demuxParams = new String[4];
+     demuxParams[0] = "-r";
+     demuxParams[1] = "" + demuxReducerCount;
+
+     demuxParams[2] = demuxInputDir;
+     demuxParams[3] = demuxOutputDir;
+
+     try {
+       return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
+     } catch (Throwable e) {
+       e.printStackTrace();
+       globalErrorcounter ++;
+     }
+     return false;
+   }
+
+
+
+   /**
+    * Move dataSink files to Demux input directory
+    * @param dataSinkDir
+    * @param demuxInputDir
+    * @return true if there's any dataSink files ready to be processed
+    * @throws IOException
+    */
+   protected boolean moveDataSinkFilesToDemuxInputDirectory(
+       String dataSinkDir, String demuxInputDir) throws IOException {
+     Path pDataSinkDir = new Path(dataSinkDir);
+     Path pDemuxInputDir = new Path(demuxInputDir);
+     log.info("dataSinkDir" + dataSinkDir);
+     log.info("demuxInputDir" + demuxInputDir);
+
+
+     boolean containsFile = false;
+
+     FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
+     if (dataSinkFiles.length > 0) {
+       setup(pDemuxInputDir);
+     }
+
+     int maxFilesPerDemux = 0;
+     for (FileStatus fstatus : dataSinkFiles) {
+       boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
+       log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
+       maxFilesPerDemux ++;
+       containsFile = true;
+       if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
+         log.info("Max File per Demux reached:" + maxFilesPerDemux);
+         break;
+       }
+     }
+     return containsFile;
+   }
+
+
+
+
+   /**
+    * Move sourceFolder inside destFolder
+    * @param dataSinkDir : ex chukwa/demux/inputDir
+    * @param demuxErrorDir : ex /chukwa/demux/inError
+    * @return true if able to move chukwa/demux/inputDir to /chukwa/demux/inError/<YYYYMMDD>/demuxInputDirXXX
+    * @throws IOException
+    */
+   protected boolean moveDataSinkFilesToDemuxErrorDirectory(
+       String dataSinkDir, String demuxErrorDir) throws IOException {
+     demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
+     return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
+   }
+
+   /**
+    * Move sourceFolder inside destFolder
+    * @param demuxInputDir: ex chukwa/demux/inputDir
+    * @param archiveDirectory: ex /chukwa/archives
+    * @return true if able to move chukwa/demux/inputDir to /chukwa/archives/raw/<YYYYMMDD>/dataSinkDirXXX
+    * @throws IOException
+    */
+   protected boolean moveDataSinkFilesToArchiveDirectory(
+       String demuxInputDir, String archiveDirectory) throws IOException {
+     archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
+     return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
+   }
+
+   /**
+    * Move sourceFolder inside destFolder
+    * @param demuxOutputDir: ex chukwa/demux/outputDir 
+    * @param postProcessDirectory: ex /chukwa/postProcess
+    * @return true if able to move chukwa/demux/outputDir to /chukwa/postProcess/demuxOutputDirXXX
+    * @throws IOException 
+    */
+   protected  boolean moveDemuxOutputDirToPostProcessDirectory(
+       String demuxOutputDir, String postProcessDirectory) throws IOException {
+     return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
+   }
+
+
+   /**
+    * Test if demuxInputDir exists
+    * @param demuxInputDir
+    * @return true if demuxInputDir exists
+    * @throws IOException
+    */
+   protected boolean checkDemuxInputDir(String demuxInputDir)
+   throws IOException {
+     return dirExists(demuxInputDir);
+   }
+
+   /**
+    * Test if demuxOutputDir exists
+    * @param demuxOutputDir
+    * @return true if demuxOutputDir exists
+    * @throws IOException
+    */
+   protected boolean checkDemuxOutputDir(String demuxOutputDir)
+   throws IOException {
+     return dirExists(demuxOutputDir);
+   }
+
+
+   /**
+    * Delete DemuxOutput directory
+    * @param demuxOutputDir
+    * @return true if succeed
+    * @throws IOException
+    */
+   protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
+   {
+     return fs.delete(new Path(demuxOutputDir), true);
+   }
+
+   /**
+    * Create directory if !exists
+    * @param directory
+    * @throws IOException
+    */
+   protected void setup(Path directory) throws IOException {
+      if ( ! fs.exists(directory)) {
+        fs.mkdirs(directory);
+      }
+    }
+
+    /** 
+     * Check if source exists and if source is a directory
+     * @param f source file
+     */
+   protected boolean dirExists(String directory) throws IOException {
+      Path pDirectory = new Path(directory);
+      return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
+    }
+
+    /**
+     * Move sourceFolder inside destFolder
+     * @param srcDir
+     * @param destDir
+     * @return
+     * @throws IOException
+     */ 
+   protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
+      if (!destDir.endsWith("/")) {
+        destDir +="/";
+      }
+      Path pSrcDir = new Path(srcDir);
+      Path pDestDir = new Path(destDir );
+      setup(pDestDir);
+      destDir += prefix +"_" +System.currentTimeMillis();
+      Path pFinalDestDir = new Path(destDir );
+
+      return fs.rename(pSrcDir, pFinalDestDir);
+    }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java Thu Apr  9 04:18:36 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.PidFile;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +38,7 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -76,13 +78,13 @@
       Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
           + workingDay + "/" + workingHour + "/" + cluster);
       FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+      
       for (FileStatus dataSourceFS : dataSourcesFS) {
         String dataSource = dataSourceFS.getPath().getName();
-        // Repo path = reposRootDirectory/<cluster>/<day>/<hour>/*/*.evt
+        // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
 
         // put the rotate flag
-        fs
-            .mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+        fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
                 + dataSource + "/" + workingDay + "/" + workingHour
                 + "/rotateDone"));
 
@@ -99,7 +101,7 @@
         mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
             + "/" + workingDay + "/" + workingHour;
         // final output fileName
-        mergeArgs[3] = dataSource + "_" + workingDay + "_" + workingHour;
+        mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
         // delete rolling directory
         mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
             + workingHour + "/" + cluster + "/" + dataSource;
@@ -150,6 +152,9 @@
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
+    PidFile pFile = new PidFile("HourlyChukwaRecordRolling");
+    Runtime.getRuntime().addShutdownHook(pFile);
+    
     conf = new ChukwaConfiguration();
     String fsName = conf.get("writer.hdfs.filesystem");
     fs = FileSystem.get(new URI(fsName), conf);
@@ -211,8 +216,13 @@
                                                                           // hour
           ) {
 
-            buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
-                workingDay, workingHour);
+            try {
+              buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
+                  workingDay, workingHour);
+            } catch(Throwable e) {
+              e.printStackTrace();
+              log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
+            }
 
           } // End if ( (workingDay < currentDay) || ( (workingDay ==
             // currentDay) && (intHour < currentHour) ) )
@@ -220,6 +230,7 @@
       } // End Try workingDay =
         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
       catch (NumberFormatException e) { /* Not a standard Day directory skip */
+        log.warn("Exception in hourlyRolling:", e);
       }
 
     } // for(FileStatus dayFS : daysFS)
@@ -243,7 +254,7 @@
 
     FileInputFormat.setInputPaths(conf, args[0]);
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
+    conf.setJobPriority(JobPriority.LOW);
     JobClient.runJob(conf);
     return 0;
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Thu Apr  9 04:18:36 2009
@@ -190,20 +190,16 @@
       if (!fs.exists(destFilePath)) {
         log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
             + destFilePath);
-        // fs.rename(recordFile,destFilePath);
-        FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
-        // FileUtil.replaceFile(new File(recordFile.toUri()), new
-        // File(destFilePath.toUri()));
+        boolean rename = fs.rename(recordFile,destFilePath);
         done = true;
-        log.info(">>>>>>>>>>>> after Rename" + destFilePath);
+        log.info(">>>>>>>>>>>> after Rename" + destFilePath + " , rename:"+rename);
       } else {
-        log.info("Start MoveToRepository main()");
+        log.info("increment spill for :" + destFilePath);
       }
       count++;
-      // Just put a limit here
-      // TODO read from config
+
       if (count > 1000) {
-        throw new IOException("too many files in this directory: " + destDir);
+        log.warn("too many files in this directory: " + destDir);
       }
     } while (!done);
   }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Thu Apr  9 04:18:36 2009
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.chukwa.extraction.database.DatabaseLoader;
+import org.apache.hadoop.chukwa.util.PidFile;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
+
+public class PostProcessorManager implements CHUKWA_CONSTANT{
+  static Logger log = Logger.getLogger(PostProcessorManager.class);
+  
+  protected static HashMap<String, String> dataSources = new HashMap<String, String>();
+  public static int errorCount = 0;
+  
+  protected int ERROR_SLEEP_TIME = 60;
+  protected ChukwaConfiguration conf = null;
+  protected FileSystem fs = null;
+  protected volatile boolean isRunning = true;
+  
+  final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
+    public boolean accept(Path file) {
+      return file.getName().startsWith("demuxOutputDir");
+    }     
+  };
+
+  
+  public PostProcessorManager() throws Exception {
+    conf = new ChukwaConfiguration();
+    init();
+  }
+  
+  public PostProcessorManager(ChukwaConfiguration conf) throws Exception {
+    this.conf = conf;
+    init();
+  }
+  
+  protected void init() throws IOException, URISyntaxException {
+    String fsName = conf.get(WRITER_HDFS_FILESYSTEM_FIELD);
+    fs = FileSystem.get(new URI(fsName), conf);
+  }
+  
+  public static void main(String[] args) throws Exception {
+ 
+    PidFile pFile = new PidFile("PostProcessorManager");
+    Runtime.getRuntime().addShutdownHook(pFile);
+    
+
+    
+    PostProcessorManager postProcessorManager = new PostProcessorManager();
+    postProcessorManager.start();
+  }
+  
+  public void shutdown() {
+    this.isRunning = false;
+  }
+  
+  public void start() {
+    
+    String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, "/chukwa/");
+    if ( ! chukwaRootDir.endsWith("/") ) {
+      chukwaRootDir += "/";
+    }
+    log.info("chukwaRootDir:" + chukwaRootDir);
+    
+    String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
+    if ( ! postProcessDir.endsWith("/") ) {
+      postProcessDir += "/";
+    }
+    
+    String chukwaRootReposDir = conf.get(CHUKWA_ROOT_REPOS_DIR_FIELD, chukwaRootDir +DEFAULT_REPOS_DIR_NAME);
+    if ( ! chukwaRootReposDir.endsWith("/") ) {
+      chukwaRootReposDir += "/";
+    }
+ 
+    dataSources = new HashMap<String, String>();
+    String[] datasources = conf.getStrings("postProcessorManager.dbloader.ds");
+    if (datasources == null || datasources.length == 0) {
+      log.warn("Cannot read postProcessorManager.dbloader.ds from configuration, bail out!");
+      System.exit(-1);
+    }
+    for(String ds: datasources) {
+      dataSources.put(ds.trim(), "");
+      log.info("Add " + ds + " to PostProcessorManager");
+    }
+
+    
+    Path postProcessDirectory = new Path(postProcessDir);
+    while (isRunning) {
+      
+      if (errorCount >= 4 ) {
+        // it's better to exit, Watchdog will re-strat it
+        log.warn("Too many error - bail out!");
+        System.exit(-1);
+      }
+      
+      try {
+        FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
+        List<String> directories = new ArrayList<String>();
+        for (FileStatus fstatus : demuxOutputDirs) {
+          directories.add(fstatus.getPath().getName());
+        }
+        
+        if (demuxOutputDirs.length == 0) {
+          try { Thread.sleep(10*1000);} catch(InterruptedException e) { /* do nothing*/}
+          continue;
+        }
+        
+        Collections.sort(directories);
+        
+        System.out.println(directories);
+        
+        String directoryToBeProcessed = null;
+        long start = 0;
+        
+        for(String directory : directories) {
+          directoryToBeProcessed = postProcessDirectory + "/"+ directory;
+          
+          log.info("PostProcess Start, directory:" + directoryToBeProcessed);
+          start = System.currentTimeMillis();
+          
+          if ( processDemuxOutput(directoryToBeProcessed) == true) {
+            if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true) {
+              deleteDirectory(directoryToBeProcessed);
+            }else {
+              log.warn("Error in movetoMainRepository for :" + directoryToBeProcessed);
+              throw new RuntimeException("");
+            }
+            
+          } else {
+            log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
+            throw new RuntimeException("");
+          }
+          log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
+          log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
+        }
+       
+      } catch (Throwable e) {
+        errorCount ++;
+        log.warn(e);
+        try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
+        catch (InterruptedException e1) {/*do nothing*/ }
+      }
+    }
+  }
+  
+  public boolean processDemuxOutput(String directory) throws IOException {
+    long start = System.currentTimeMillis();
+    DatabaseLoader.loadData(fs,directory, dataSources);
+    log.info("loadData Duration:" + (System.currentTimeMillis() - start));
+    
+    return true;
+  }
+  
+  public boolean movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws Exception {
+    String[] args = {sourceDirectory,repoRootDirectory};
+    long start = System.currentTimeMillis();
+    MoveToRepository.main(args);
+    log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
+    return true;
+  }
+  
+  public boolean deleteDirectory(String directory) throws IOException {
+   return fs.delete(new Path(directory), true);
+  }
+  
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=763513&r1=763512&r2=763513&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Thu Apr  9 04:18:36 2009
@@ -553,7 +553,7 @@
 
             // FIXME: Hack to make the log file readable by chukwa user. 
             if(System.getProperty("os.name").intern()=="Linux".intern()) {
-              Runtime.getRuntime().exec("chmod g+rw "+getFile());
+              Runtime.getRuntime().exec("chmod 644 "+getFile());
             }
             
             // Watchdog is watching for ChukwaAgent only once every 5 minutes,

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/NagiosHelper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/NagiosHelper.java?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/NagiosHelper.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/NagiosHelper.java Thu Apr  9 04:18:36 2009
@@ -0,0 +1,22 @@
+package org.apache.hadoop.chukwa.util;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.nagios.Nsca;
+
+public class NagiosHelper {
+  static Logger log = Logger.getLogger(NagiosHelper.class);
+  public static final int NAGIOS_OK       = Nsca.NAGIOS_OK;
+  public static final int NAGIOS_WARN     = Nsca.NAGIOS_WARN;
+  public static final int NAGIOS_CRITICAL = Nsca.NAGIOS_CRITICAL;
+  public static final int NAGIOS_UNKNOWN  = Nsca.NAGIOS_UNKNOWN;
+  
+  public static void sendNsca(String nagiosHost,int nagiosPort,String reportingHost,String reportingService,String msg,int state) {
+    Nsca nsca = new Nsca();
+    try {
+      nsca.send_nsca(nagiosHost, ""+nagiosPort, reportingHost, reportingService, msg, state, 1);
+    } catch (Throwable e) {
+     log.warn(e);
+    }
+     
+  }
+}



Mime
View raw message