Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 59008 invoked from network); 22 Jun 2009 23:56:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Jun 2009 23:56:19 -0000 Received: (qmail 46167 invoked by uid 500); 22 Jun 2009 23:56:30 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 46155 invoked by uid 500); 22 Jun 2009 23:56:30 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 46145 invoked by uid 99); 22 Jun 2009 23:56:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2009 23:56:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2009 23:56:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0E2D6238887A; Mon, 22 Jun 2009 23:55:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r787449 - in /hadoop/chukwa: branches/chukwa-0.2/ branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/ branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/ trunk/ trunk/src/java/org/apache/hadoop/chukwa/extra... Date: Mon, 22 Jun 2009 23:55:58 -0000 To: chukwa-commits@hadoop.apache.org From: eyang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090622235559.0E2D6238887A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eyang Date: Mon Jun 22 23:55:58 2009 New Revision: 787449 URL: http://svn.apache.org/viewvc?rev=787449&view=rev Log: CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory. (Jerome Boulon via Eric Yang) Modified: hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java hadoop/chukwa/trunk/CHANGES.txt hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Modified: hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt?rev=787449&r1=787448&r2=787449&view=diff ============================================================================== --- hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt (original) +++ hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt Mon Jun 22 23:55:58 2009 @@ -34,6 +34,8 @@ IMPROVEMENTS + CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory. (Jerome Boulon via Eric Yang) + CHUKWA-318. Added check for disk capacity for collector to write. (Jerome Boulon via Eric Yang) CHUKWA-276. Fix hourly and daily rolling to use a single reducer. (Jerome Boulon via Eric Yang) Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=787449&r1=787448&r2=787449&view=diff ============================================================================== --- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (original) +++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Mon Jun 22 23:55:58 2009 @@ -24,8 +24,10 @@ public static final String CHUKWA_ROOT_DIR_FIELD = "chukwaRootDir"; public static final String CHUKWA_ROOT_REPOS_DIR_FIELD = "chukwaRootReposDir"; + public static final String CHUKWA_ARCHIVE_DIR_FIELD = "chukwaArchiveDir"; public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir"; + public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir"; public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir"; public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost"; @@ -38,6 +40,7 @@ public static final String DEFAULT_DEMUX_ROOT_DIR_NAME = "/chukwa/"; public static final String DEFAULT_REPOS_DIR_NAME = "repos/"; public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME = "postProcess/"; + public static final String DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME = "postProcessInError/"; public static final String DEFAULT_CHUKWA_LOGS_DIR_NAME = "logs/"; public static final String DEFAULT_DEMUX_PROCESSING_DIR_NAME = "demuxProcessing/"; Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=787449&r1=787448&r2=787449&view=diff ============================================================================== --- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original) +++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Mon Jun 22 23:55:58 2009 @@ -102,6 +102,12 @@ chukwaRootReposDir += "/"; } + String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME); + if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) { + chukwaPostProcessInErrorDir += "/"; + } + + dataSources = new HashMap(); Path postProcessDirectory = new Path(postProcessDir); while (isRunning) { @@ -134,21 +140,22 @@ log.info("PostProcess Start, directory:" + directoryToBeProcessed); start = System.currentTimeMillis(); - - if ( processDemuxOutput(directoryToBeProcessed) == true) { - if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true) { - deleteDirectory(directoryToBeProcessed); - }else { - log.warn("Error in movetoMainRepository for :" + directoryToBeProcessed); - throw new RuntimeException(""); + + try { + if ( processDemuxPigOutput(directoryToBeProcessed) == true) { + if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true) { + deleteDirectory(directoryToBeProcessed); + log.info("PostProcess Stop, directory:" + directoryToBeProcessed); + log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start)); + continue; + } } - - } else { + // if we are here it's because something bad happen during processing log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed); - throw new RuntimeException(""); - } - log.info("PostProcess Stop, directory:" + directoryToBeProcessed); - log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start)); + moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); + } catch (Throwable e) { + log.warn("Error in processDemuxOutput:" ,e); + } } } catch (Throwable e) { @@ -160,7 +167,7 @@ } } - public boolean processDemuxOutput(String directory) throws IOException { + public boolean processDemuxPigOutput(String directory) throws IOException { long start = System.currentTimeMillis(); try { String[] classes = conf.get(POST_DEMUX_DATA_LOADER).split(","); @@ -196,6 +203,21 @@ return true; } + public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception { + Path inErrorDir = new Path(inErrorDirectory); + if (!fs.exists(inErrorDir)) { + fs.mkdirs(inErrorDir); + } + + if (inErrorDirectory.endsWith("/")) { + inErrorDirectory += "/"; + } + String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis(); + fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory)); + log.warn("Error in postProcess :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory); + return true; + } + public boolean deleteDirectory(String directory) throws IOException { return fs.delete(new Path(directory), true); } Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=787449&r1=787448&r2=787449&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Mon Jun 22 23:55:58 2009 @@ -34,6 +34,8 @@ IMPROVEMENTS + CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory. (Jerome Boulon via Eric Yang) + CHUKWA-318. Added check for disk capacity for collector to write. (Jerome Boulon via Eric Yang) CHUKWA-276. Fix hourly and daily rolling to use a single reducer. (Jerome Boulon via Eric Yang) Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=787449&r1=787448&r2=787449&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Mon Jun 22 23:55:58 2009 @@ -24,8 +24,10 @@ public static final String CHUKWA_ROOT_DIR_FIELD = "chukwaRootDir"; public static final String CHUKWA_ROOT_REPOS_DIR_FIELD = "chukwaRootReposDir"; + public static final String CHUKWA_ARCHIVE_DIR_FIELD = "chukwaArchiveDir"; public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir"; + public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir"; public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir"; public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost"; @@ -38,6 +40,7 @@ public static final String DEFAULT_DEMUX_ROOT_DIR_NAME = "/chukwa/"; public static final String DEFAULT_REPOS_DIR_NAME = "repos/"; public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME = "postProcess/"; + public static final String DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME = "postProcessInError/"; public static final String DEFAULT_CHUKWA_LOGS_DIR_NAME = "logs/"; public static final String DEFAULT_DEMUX_PROCESSING_DIR_NAME = "demuxProcessing/"; Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=787449&r1=787448&r2=787449&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Mon Jun 22 23:55:58 2009 @@ -102,6 +102,12 @@ chukwaRootReposDir += "/"; } + String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME); + if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) { + chukwaPostProcessInErrorDir += "/"; + } + + dataSources = new HashMap(); Path postProcessDirectory = new Path(postProcessDir); while (isRunning) { @@ -134,21 +140,22 @@ log.info("PostProcess Start, directory:" + directoryToBeProcessed); start = System.currentTimeMillis(); - - if ( processDemuxOutput(directoryToBeProcessed) == true) { - if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true) { - deleteDirectory(directoryToBeProcessed); - }else { - log.warn("Error in movetoMainRepository for :" + directoryToBeProcessed); - throw new RuntimeException(""); + + try { + if ( processDemuxPigOutput(directoryToBeProcessed) == true) { + if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true) { + deleteDirectory(directoryToBeProcessed); + log.info("PostProcess Stop, directory:" + directoryToBeProcessed); + log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start)); + continue; + } } - - } else { + // if we are here it's because something bad happen during processing log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed); - throw new RuntimeException(""); - } - log.info("PostProcess Stop, directory:" + directoryToBeProcessed); - log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start)); + moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); + } catch (Throwable e) { + log.warn("Error in processDemuxOutput:" ,e); + } } } catch (Throwable e) { @@ -160,7 +167,7 @@ } } - public boolean processDemuxOutput(String directory) throws IOException { + public boolean processDemuxPigOutput(String directory) throws IOException { long start = System.currentTimeMillis(); try { String[] classes = conf.get(POST_DEMUX_DATA_LOADER).split(","); @@ -196,6 +203,21 @@ return true; } + public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception { + Path inErrorDir = new Path(inErrorDirectory); + if (!fs.exists(inErrorDir)) { + fs.mkdirs(inErrorDir); + } + + if (inErrorDirectory.endsWith("/")) { + inErrorDirectory += "/"; + } + String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis(); + fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory)); + log.warn("Error in postProcess :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory); + return true; + } + public boolean deleteDirectory(String directory) throws IOException { return fs.delete(new Path(directory), true); }