chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r937859 - in /hadoop/chukwa/trunk: conf/ src/java/org/apache/hadoop/chukwa/datatrigger/ src/java/org/apache/hadoop/chukwa/extraction/ src/java/org/apache/hadoop/chukwa/extraction/demux/ src/test/org/apache/hadoop/chukwa/extraction/demux/
Date Sun, 25 Apr 2010 19:12:23 GMT
Author: eyang
Date: Sun Apr 25 19:12:23 2010
New Revision: 937859

URL: http://svn.apache.org/viewvc?rev=937859&view=rev
Log:
CHUKWA-477. Support post-demux trigger.  (Bill Graham via Eric Yang)

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerAction.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerEvent.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/MockTriggerAction.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestPostDemuxTrigger.java
Modified:
    hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java

Modified: hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template?rev=937859&r1=937858&r2=937859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template (original)
+++ hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template Sun Apr 25 19:12:23 2010
@@ -69,6 +69,14 @@
     <value>org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader</value>
   </property>
 
+  <property>
+    <name>chukwa.post.demux.success.action</name>
+    <value></value>
+    <description>Comma-separated list of
+    org.apache.hadoop.chukwa.datatrigger.TriggerAction implementations
+    that will be triggered upon a successful Demux/PostProcessor job</description>
+  </property>
+
 <!-- -->
 
 <!-- ArchiveManager config -->

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerAction.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerAction.java?rev=937859&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerAction.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerAction.java Sun
Apr 25 19:12:23 2010
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */                               
+package org.apache.hadoop.chukwa.datatrigger;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+public interface TriggerAction {
+
+  public void execute(Configuration conf, FileSystem fs,
+                      FileStatus[] src, TriggerEvent triggerEvent) throws IOException;
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerEvent.java?rev=937859&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerEvent.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/TriggerEvent.java Sun
Apr 25 19:12:23 2010
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datatrigger;
+
+
+/**
+ * enum that encapsulates the different possible events that can be triggered.
+ * When a call is made to a TriggerAction class, the caller must pass a TriggerEvent
+ * object to identify the event that is occurring.
+ */
+public enum TriggerEvent {
+
+  POST_DEMUX_SUCCESS("postDemuxSuccess", "chukwa.trigger.post.demux.success");
+
+  private String name;
+  private String configKeyBase;
+
+  private TriggerEvent(String name, String configKeyBase) {
+    this.name = name;
+    this.configKeyBase = configKeyBase;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getConfigKeyBase() {
+    return configKeyBase;
+  }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=937859&r1=937858&r2=937859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
Sun Apr 25 19:12:23 2010
@@ -60,4 +60,5 @@ public interface CHUKWA_CONSTANT {
   public static final String ARCHIVES_IN_ERROR_DIR_NAME      = "inError/";
 
   public static final String POST_DEMUX_DATA_LOADER = "chukwa.post.demux.data.loader";  
+  public static final String POST_DEMUX_SUCCESS_ACTION = "chukwa.post.demux.success.action";
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=937859&r1=937858&r2=937859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
Sun Apr 25 19:12:23 2010
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,11 +47,12 @@ public class MoveToRepository {
   static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
   static Calendar calendar = Calendar.getInstance();
 
-  static void processClusterDirectory(Path srcDir, String destDir)
+  static Collection<Path> processClusterDirectory(Path srcDir, String destDir)
       throws Exception {
     log.info("processClusterDirectory (" + srcDir.getName() + "," + destDir
         + ")");
     FileStatus fstat = fs.getFileStatus(srcDir);
+    Collection<Path> destFiles = new HashSet<Path>();
 
     if (!fstat.isDir()) {
       throw new IOException(srcDir + " is not a directory!");
@@ -69,14 +73,16 @@ public class MoveToRepository {
         log.info("dest directory path: " + destPath);
         log.info("processClusterDirectory processing Datasource: (" + dirName
             + ")");
-        processDatasourceDirectory(srcDir.getName(), datasourceDirectory
-            .getPath(), destDir + "/" + dirName);
+        destFiles.addAll(processDatasourceDirectory(srcDir.getName(),
+            datasourceDirectory.getPath(), destDir + "/" + dirName));
       }
     }
+    return destFiles;
   }
 
-  static void processDatasourceDirectory(String cluster, Path srcDir,
+  static Collection<Path> processDatasourceDirectory(String cluster, Path srcDir,
       String destDir) throws Exception {
+    Collection<Path> destFiles = new HashSet<Path>();
     String fileName = null;
     int fileDay = 0;
     int fileHour = 0;
@@ -100,8 +106,11 @@ public class MoveToRepository {
         // Hadoop_dfs_datanode_20080919.D.evt
 
         fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
-        writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),
-            dataSource + "_" + fileDay);
+        Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
+            recordFile.getPath(), dataSource + "_" + fileDay);
+        if (destFile != null) {
+          destFiles.add(destFile);
+        }
       } else if (fileName.endsWith(".H.evt")) {
         // Hadoop_dfs_datanode_20080925_1.H.evt
         // Hadoop_dfs_datanode_20080925_12.H.evt
@@ -122,8 +131,11 @@ public class MoveToRepository {
         fileDay = Integer.parseInt(day);
         fileHour = Integer.parseInt(hour);
         // rotate there so spill
-        writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
+        Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
             recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
+        if (destFile != null) {
+          destFiles.add(destFile);
+        }
         // mark this directory for daily rotate
         addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
       } else if (fileName.endsWith(".R.evt")) {
@@ -140,15 +152,20 @@ public class MoveToRepository {
         log.info("fileDay: " + fileDay);
         log.info("fileHour: " + fileHour);
         log.info("fileMin: " + fileMin);
-        writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
+        Path destFile = writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
             + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
             + fileHour + "_" + fileMin);
+        if (destFile != null) {
+          destFiles.add(destFile);
+        }
         // mark this directory for hourly rotate
         addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
       } else {
         throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
       }
     }
+
+    return destFiles;
   }
 
   static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
@@ -171,7 +188,7 @@ public class MoveToRepository {
     }
   }
 
-  static void writeRecordFile(String destDir, Path recordFile, String fileName)
+  static Path writeRecordFile(String destDir, Path recordFile, String fileName)
       throws IOException {
     boolean done = false;
     int count = 1;
@@ -191,6 +208,7 @@ public class MoveToRepository {
         boolean rename = fs.rename(recordFile,destFilePath);
         done = true;
         log.info(">>>>>>>>>>>> after Rename" + destFilePath
+ " , rename:"+rename);
+        return destFilePath;
       } 
       count++;
 
@@ -198,6 +216,8 @@ public class MoveToRepository {
         log.warn("too many files in this directory: " + destDir);
       }
     } while (!done);
+
+    return null;
   }
 
   static boolean checkRotate(String directoryAsString,
@@ -215,22 +235,15 @@ public class MoveToRepository {
     }
   }
 
-  /**
-   * @param args
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
+  public static Path[] doMove(Path srcDir, String destDir) throws Exception {
     conf = new ChukwaConfiguration();
     String fsName = conf.get("writer.hdfs.filesystem");
     fs = FileSystem.get(new URI(fsName), conf);
-
-    Path srcDir = new Path(args[0]);
-    String destDir = args[1];
-
-    log.info("Start MoveToRepository main()");
+    log.info("Start MoveToRepository doMove()");
 
     FileStatus fstat = fs.getFileStatus(srcDir);
 
+    Collection<Path> destinationFiles = new HashSet<Path>();
     if (!fstat.isDir()) {
       throw new IOException(srcDir + " is not a directory!");
     } else {
@@ -247,16 +260,27 @@ public class MoveToRepository {
         log
             .info("main procesing Cluster (" + cluster.getPath().getName()
                 + ")");
-        processClusterDirectory(cluster.getPath(), destDir + "/"
-            + cluster.getPath().getName());
+        destinationFiles.addAll(processClusterDirectory(cluster.getPath(),
+            destDir + "/" + cluster.getPath().getName()));
 
         // Delete the demux's cluster dir
         FileUtil.fullyDelete(fs, cluster.getPath());
       }
     }
 
-    log.info("Done with MoveToRepository main()");
+    log.info("Done with MoveToRepository doMove()");
+    return destinationFiles.toArray(new Path[destinationFiles.size()]);
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
 
+    Path srcDir = new Path(args[0]);
+    String destDir = args[1];
+    doMove(srcDir, destDir);
   }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=937859&r1=937858&r2=937859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
Sun Apr 25 19:12:23 2010
@@ -25,12 +25,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Collection;
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
+import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -142,11 +145,14 @@ public class PostProcessorManager implem
           start = System.currentTimeMillis();
          
           try {
-            if ( processDemuxPigOutput(directoryToBeProcessed) == true) {
-              if (movetoMainRepository(directoryToBeProcessed,chukwaRootReposDir) == true)
{
+            if ( processDataLoaders(directoryToBeProcessed) == true) {
+              Path[] destFiles = movetoMainRepository(
+                directoryToBeProcessed,chukwaRootReposDir);
+              if (destFiles != null && destFiles.length > 0) {
                 deleteDirectory(directoryToBeProcessed);
                 log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
                 log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
+                processPostMoveTriggers(destFiles);
                 continue;
               }
             }
@@ -167,7 +173,7 @@ public class PostProcessorManager implem
     }
   }
   
-  public boolean processDemuxPigOutput(String directory) throws IOException {
+  public boolean processDataLoaders(String directory) throws IOException {
     long start = System.currentTimeMillis();
     try {
       String[] classes = conf.get(POST_DEMUX_DATA_LOADER,"org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader").split(",");
@@ -194,14 +200,44 @@ public class PostProcessorManager implem
     log.info("loadData Duration:" + (System.currentTimeMillis() - start));
     return true;
   }
-  
-  public boolean movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws
Exception {
-    String[] args = {sourceDirectory,repoRootDirectory};
+
+  public boolean processPostMoveTriggers(Path[] files) throws IOException {
     long start = System.currentTimeMillis();
-    MoveToRepository.main(args);
-    log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
+    try {
+      String actions = conf.get(POST_DEMUX_SUCCESS_ACTION, null);
+      if (actions == null || actions.trim().length() == 0) {
+        return true;
+      }
+      log.info("PostProcess firing postMoveTriggers");
+
+      String[] classes = actions.trim().split(",");
+      for(String actionName : classes) {
+        Class<? extends TriggerAction> actionClass =
+            (Class<? extends TriggerAction>) Class.forName(actionName);
+        java.lang.reflect.Constructor<? extends TriggerAction> c =
+            actionClass.getConstructor();
+        TriggerAction action = c.newInstance();
+
+        log.info(actionName + " handling " + files.length + " events");
+
+        //send the files that were just added benieth the repos/ dir.
+        FileStatus[] events = fs.listStatus(files);
+        action.execute(conf, fs, events, TriggerEvent.POST_DEMUX_SUCCESS);
+      }
+    } catch(Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+      return false;
+    }
+    log.info("postMoveTriggers Duration:" + (System.currentTimeMillis() - start));
     return true;
   }
+
+  public Path[] movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws
Exception {
+    long start = System.currentTimeMillis();
+    Path[] destFiles = MoveToRepository.doMove(new Path(sourceDirectory),repoRootDirectory);
+    log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
+    return destFiles;
+  }
   
   public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory)
throws Exception {
     Path inErrorDir = new Path(inErrorDirectory);

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/MockTriggerAction.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/MockTriggerAction.java?rev=937859&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/MockTriggerAction.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/MockTriggerAction.java
Sun Apr 25 19:12:23 2010
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
+import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.ArrayList;
+
+public class MockTriggerAction implements TriggerAction {
+
+  private static Collection<TriggerEvent> triggerEvents = new ArrayList<TriggerEvent>();
+
+  public void execute(Configuration conf, FileSystem fs,
+                      FileStatus[] src, TriggerEvent event) throws IOException {
+    triggerEvents.add(event);
+  }
+
+  public static void reset() {
+    triggerEvents = new ArrayList<TriggerEvent>();
+  }
+
+  public static Collection<TriggerEvent> getTriggerEvents() {
+    return triggerEvents;
+  }
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestPostDemuxTrigger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestPostDemuxTrigger.java?rev=937859&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestPostDemuxTrigger.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestPostDemuxTrigger.java
Sun Apr 25 19:12:23 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Iterator;
+
+public class TestPostDemuxTrigger extends TestCase {
+
+  static final Path[] SAMPLE_PATHS = new Path[] { new Path("/") };
+
+  protected void setUp() throws Exception {
+    MockTriggerAction.reset();
+  }
+
+  public void testSuccessTrigger() throws Exception {
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    conf.set(CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION,
+            "org.apache.hadoop.chukwa.extraction.demux.MockTriggerAction");
+
+    PostProcessorManager postProcessManager = new PostProcessorManager(conf);
+    assertTrue("processPostMoveTriggers returned false",
+            postProcessManager.processPostMoveTriggers(SAMPLE_PATHS));
+
+    assertEquals("Trigger never invoked", SAMPLE_PATHS.length,
+            MockTriggerAction.getTriggerEvents().size());
+    Iterator events = MockTriggerAction.getTriggerEvents().iterator();
+    assertEquals("Incorrect Trigger event found", TriggerEvent.POST_DEMUX_SUCCESS,
+            events.next());
+  }
+
+  public void testMultiSuccessTrigger() throws Exception {
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    conf.set(CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION,
+            "org.apache.hadoop.chukwa.extraction.demux.MockTriggerAction," +
+            "org.apache.hadoop.chukwa.extraction.demux.MockTriggerAction");
+
+    PostProcessorManager postProcessManager = new PostProcessorManager(conf);
+    assertTrue("processPostMoveTriggers returned false",
+            postProcessManager.processPostMoveTriggers(SAMPLE_PATHS));
+
+    assertEquals("Trigger never invoked", 2 * SAMPLE_PATHS.length,
+            MockTriggerAction.getTriggerEvents().size());
+    Iterator events = MockTriggerAction.getTriggerEvents().iterator();
+    assertEquals("Incorrect Trigger event found", TriggerEvent.POST_DEMUX_SUCCESS,
+            events.next());
+    assertEquals("Incorrect Trigger event found", TriggerEvent.POST_DEMUX_SUCCESS,
+            events.next());
+  }
+
+}



Mime
View raw message