chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r787449 - in /hadoop/chukwa: branches/chukwa-0.2/ branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/ branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/ trunk/ trunk/src/java/org/apache/hadoop/chukwa/extra...
Date Mon, 22 Jun 2009 23:55:58 GMT
Author: eyang
Date: Mon Jun 22 23:55:58 2009
New Revision: 787449

URL: http://svn.apache.org/viewvc?rev=787449&view=rev
Log:
CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory.
(Jerome Boulon via Eric Yang)

Modified:
    hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java

Modified: hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt?rev=787449&r1=787448&r2=787449&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt (original)
+++ hadoop/chukwa/branches/chukwa-0.2/CHANGES.txt Mon Jun 22 23:55:58 2009
@@ -34,6 +34,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory.
(Jerome Boulon via Eric Yang)
+
     CHUKWA-318. Added check for disk capacity for collector to write. (Jerome Boulon via
Eric Yang)
 
     CHUKWA-276. Fix hourly and daily rolling to use a single reducer. (Jerome Boulon via
Eric Yang)

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=787449&r1=787448&r2=787449&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
(original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
Mon Jun 22 23:55:58 2009
@@ -24,8 +24,10 @@
   public static final String CHUKWA_ROOT_DIR_FIELD = "chukwaRootDir";
   public static final String CHUKWA_ROOT_REPOS_DIR_FIELD = "chukwaRootReposDir";
 
+  
   public static final String CHUKWA_ARCHIVE_DIR_FIELD = "chukwaArchiveDir";
   public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir";
+  public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir";
   public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir";
   
   public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost";
@@ -38,6 +40,7 @@
   public static final String DEFAULT_DEMUX_ROOT_DIR_NAME          = "/chukwa/";
   public static final String DEFAULT_REPOS_DIR_NAME               = "repos/";
   public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME  = "postProcess/";
+  public static final String DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME = "postProcessInError/";
   public static final String DEFAULT_CHUKWA_LOGS_DIR_NAME         = "logs/";
   
   public static final String DEFAULT_DEMUX_PROCESSING_DIR_NAME    = "demuxProcessing/";

Modified: hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=787449&r1=787448&r2=787449&view=diff
==============================================================================
--- hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
(original)
+++ hadoop/chukwa/branches/chukwa-0.2/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
Mon Jun 22 23:55:58 2009
@@ -102,6 +102,12 @@
       chukwaRootReposDir += "/";
     }
  
+    String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD,
chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
+    if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
+      chukwaPostProcessInErrorDir += "/";
+    }
+ 
+    
     dataSources = new HashMap<String, String>();
     Path postProcessDirectory = new Path(postProcessDir);
     while (isRunning) {
@@ -134,21 +140,22 @@
           
           log.info("PostProcess Start, directory:" + directoryToBeProcessed);
           start = System.currentTimeMillis();
-          
-          if ( processDemuxOutput(directoryToBeProcessed) == true) {
-            if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true)
{
-              deleteDirectory(directoryToBeProcessed);
-            }else {
-              log.warn("Error in movetoMainRepository for :" + directoryToBeProcessed);
-              throw new RuntimeException("");
+         
+          try {
+            if ( processDemuxPigOutput(directoryToBeProcessed) == true) {
+              if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true)
{
+                deleteDirectory(directoryToBeProcessed);
+                log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
+                log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
+                continue;
+              }
             }
-            
-          } else {
+            // if we are here it's because something bad happen during processing
             log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
-            throw new RuntimeException("");
-          }
-          log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
-          log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
+            moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir);

+          } catch (Throwable e) {
+            log.warn("Error in processDemuxOutput:" ,e);
+          } 
         }
        
       } catch (Throwable e) {
@@ -160,7 +167,7 @@
     }
   }
   
-  public boolean processDemuxOutput(String directory) throws IOException {
+  public boolean processDemuxPigOutput(String directory) throws IOException {
     long start = System.currentTimeMillis();
     try {
       String[] classes = conf.get(POST_DEMUX_DATA_LOADER).split(",");
@@ -196,6 +203,21 @@
     return true;
   }
   
+  public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory)
throws Exception {
+    Path inErrorDir = new Path(inErrorDirectory);
+    if (!fs.exists(inErrorDir)) {
+      fs.mkdirs(inErrorDir);
+    }
+    
+    if (inErrorDirectory.endsWith("/")) {
+      inErrorDirectory += "/";
+    }
+    String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
+    fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
+    log.warn("Error in postProcess  :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
+    return true;
+  }
+  
   public boolean deleteDirectory(String directory) throws IOException {
    return fs.delete(new Path(directory), true);
   }

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=787449&r1=787448&r2=787449&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Jun 22 23:55:58 2009
@@ -34,6 +34,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory.
(Jerome Boulon via Eric Yang)
+
     CHUKWA-318. Added check for disk capacity for collector to write. (Jerome Boulon via
Eric Yang)
 
     CHUKWA-276. Fix hourly and daily rolling to use a single reducer. (Jerome Boulon via
Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=787449&r1=787448&r2=787449&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
Mon Jun 22 23:55:58 2009
@@ -24,8 +24,10 @@
   public static final String CHUKWA_ROOT_DIR_FIELD = "chukwaRootDir";
   public static final String CHUKWA_ROOT_REPOS_DIR_FIELD = "chukwaRootReposDir";
 
+  
   public static final String CHUKWA_ARCHIVE_DIR_FIELD = "chukwaArchiveDir";
   public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir";
+  public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir";
   public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir";
   
   public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost";
@@ -38,6 +40,7 @@
   public static final String DEFAULT_DEMUX_ROOT_DIR_NAME          = "/chukwa/";
   public static final String DEFAULT_REPOS_DIR_NAME               = "repos/";
   public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME  = "postProcess/";
+  public static final String DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME = "postProcessInError/";
   public static final String DEFAULT_CHUKWA_LOGS_DIR_NAME         = "logs/";
   
   public static final String DEFAULT_DEMUX_PROCESSING_DIR_NAME    = "demuxProcessing/";

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=787449&r1=787448&r2=787449&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
Mon Jun 22 23:55:58 2009
@@ -102,6 +102,12 @@
       chukwaRootReposDir += "/";
     }
  
+    String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD,
chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
+    if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
+      chukwaPostProcessInErrorDir += "/";
+    }
+ 
+    
     dataSources = new HashMap<String, String>();
     Path postProcessDirectory = new Path(postProcessDir);
     while (isRunning) {
@@ -134,21 +140,22 @@
           
           log.info("PostProcess Start, directory:" + directoryToBeProcessed);
           start = System.currentTimeMillis();
-          
-          if ( processDemuxOutput(directoryToBeProcessed) == true) {
-            if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true)
{
-              deleteDirectory(directoryToBeProcessed);
-            }else {
-              log.warn("Error in movetoMainRepository for :" + directoryToBeProcessed);
-              throw new RuntimeException("");
+         
+          try {
+            if ( processDemuxPigOutput(directoryToBeProcessed) == true) {
+              if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true)
{
+                deleteDirectory(directoryToBeProcessed);
+                log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
+                log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
+                continue;
+              }
             }
-            
-          } else {
+            // if we are here it's because something bad happen during processing
             log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
-            throw new RuntimeException("");
-          }
-          log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
-          log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
+            moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir);

+          } catch (Throwable e) {
+            log.warn("Error in processDemuxOutput:" ,e);
+          } 
         }
        
       } catch (Throwable e) {
@@ -160,7 +167,7 @@
     }
   }
   
-  public boolean processDemuxOutput(String directory) throws IOException {
+  public boolean processDemuxPigOutput(String directory) throws IOException {
     long start = System.currentTimeMillis();
     try {
       String[] classes = conf.get(POST_DEMUX_DATA_LOADER).split(",");
@@ -196,6 +203,21 @@
     return true;
   }
   
+  public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory)
throws Exception {
+    Path inErrorDir = new Path(inErrorDirectory);
+    if (!fs.exists(inErrorDir)) {
+      fs.mkdirs(inErrorDir);
+    }
+    
+    if (inErrorDirectory.endsWith("/")) {
+      inErrorDirectory += "/";
+    }
+    String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
+    fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
+    log.warn("Error in postProcess  :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
+    return true;
+  }
+  
   public boolean deleteDirectory(String directory) throws IOException {
    return fs.delete(new Path(directory), true);
   }



Mime
View raw message