chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r763513 [2/2] - in /hadoop/chukwa/trunk: ./ bin/ conf/ lib/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/extraction/ src/java/org/apache/hadoop/chukwa/extraction/archive/ src/java/org/apache/hadoop/chukwa/extrac...
Date Thu, 09 Apr 2009 04:18:37 GMT
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxManager.java?rev=763513&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxManager.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxManager.java
Thu Apr  9 04:18:36 2009
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+
+public class TestDemuxManager extends TestCase {
+
+  
+  /**
+   * Standard workflow
+   */
+  public void testScenario1() {
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    String tempDirectory = System.getProperty("test.build.data", "/tmp");
+    String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis()
+"/";
+    
+    cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
+    
+     
+    try {
+      
+      File dataSinkDirectory = new File(chukwaRootDir +"/logs");
+      dataSinkDirectory.mkdirs();
+      File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink1.done");
+      dataSinkFile.createNewFile();
+      
+      DemuxManagerScenario dm = new DemuxManagerScenario1(cc,0);  
+      dm.start();
+      
+      List<String> requireActions = new ArrayList<String>();
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("checkDemuxInputDir:false");
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
+      requireActions.add("runDemux:true");
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
+      requireActions.add("processData done");
+      
+      List<String> actions = dm.actions;
+     
+      Assert.assertTrue(requireActions.size() == actions.size());
+      
+      for(int i=0;i<requireActions.size();i++) {
+        Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern()
== actions.get(i).intern());
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+     Assert.fail();
+    }
+    finally {
+      deleteDirectory(new File(chukwaRootDir));
+    }
+  }
+  
+  /**
+   * No dataSink file at  startup
+   * Add one later
+   */
+  public void testScenario2() {
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    String tempDirectory = System.getProperty("test.build.data", "/tmp");
+    String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis()
+"/";
+    
+    cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
+
+    try {
+      DemuxManagerScenario dm = new DemuxManagerScenario2(cc);  
+      dm.start();
+ 
+      List<String> requireActions = new ArrayList<String>();
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("checkDemuxInputDir:false");
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:false");
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("checkDemuxInputDir:false");
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
+      requireActions.add("runDemux:true");
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
+      requireActions.add("processData done");
+
+      List<String> actions = dm.actions;
+      Assert.assertTrue(requireActions.size() == actions.size());
+      
+      for(int i=0;i<requireActions.size();i++) {
+        Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern()
== actions.get(i).intern());
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    finally {
+      deleteDirectory(new File(chukwaRootDir));
+    }
+  }
+  
+  
+  /**
+   * DataSink file in dataSink directory
+   * MR_INPUT_DIR already there
+   * MR_INPUT_DIR should be reprocessed first
+   * Then dataSink file
+   */
+  public void testScenario3() {
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    String tempDirectory = System.getProperty("test.build.data", "/tmp");
+    String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis()
+"/";
+    
+    cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
+
+    try {
+      File mrInputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+
CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_INPUT_DIR_NAME);
+      mrInputDir.mkdirs();
+      File dataSinkDirectory = new File(chukwaRootDir +"/logs");
+      dataSinkDirectory.mkdirs();
+      File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink3.done");
+      dataSinkFile.createNewFile();
+      
+      DemuxManagerScenario dm = new DemuxManagerScenario1(cc,2);  
+      dm.start();
+ 
+      List<String> requireActions = new ArrayList<String>();
+     
+      // DEMUX_INPUT_DIR reprocessing
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("checkDemuxInputDir:true");
+      requireActions.add("runDemux:true");
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
+      requireActions.add("processData done");
+
+      // dataSink3.done processing
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:false"); 
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
+      requireActions.add("runDemux:true"); 
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
+      requireActions.add("processData done");
+      
+      List<String> actions = dm.actions;
+      Assert.assertTrue(requireActions.size() == actions.size());
+      
+      for (int i=0;i<requireActions.size();i++) {
+        Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern()
== actions.get(i).intern());
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    finally {
+      deleteDirectory(new File(chukwaRootDir));
+    }
+  }
+  
+  
+  /**
+   * DataSink file in dataSink directory
+   * MR_INPUT_DIR already there
+   * MR_OUTPUT_DIR already there
+   * MR_OUTPUT_DIR should be deleted first
+   * Then MR_INPUT_DIR should be reprocessed
+   * Then dataSink file
+   */
+  public void testScenario4() {
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    String tempDirectory = System.getProperty("test.build.data", "/tmp");
+    String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis()
+"/";
+    
+    cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
+
+    try {
+      File mrInputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+
CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_INPUT_DIR_NAME);
+      mrInputDir.mkdirs();
+      File mrOutputDir = new File(chukwaRootDir + CHUKWA_CONSTANT.DEFAULT_DEMUX_PROCESSING_DIR_NAME+
CHUKWA_CONSTANT.DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
+      mrOutputDir.mkdirs();
+      
+      File dataSinkDirectory = new File(chukwaRootDir +"/logs");
+      dataSinkDirectory.mkdirs();
+      File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink4.done");
+      dataSinkFile.createNewFile();
+      
+      DemuxManagerScenario dm = new DemuxManagerScenario1(cc,2);  
+      dm.start();
+ 
+      List<String> requireActions = new ArrayList<String>();
+
+      requireActions.add("checkDemuxOutputDir:true"); 
+      requireActions.add("deleteDemuxOutputDir:true");
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("checkDemuxInputDir:true");
+      requireActions.add("runDemux:true"); 
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
+      requireActions.add("processData done");
+
+      // dataSink4.done processing
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:false"); 
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
+      requireActions.add("runDemux:true"); 
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
+      requireActions.add("processData done");
+            
+      List<String> actions = dm.actions;
+      Assert.assertTrue(requireActions.size() == actions.size());
+      
+      for(int i=0;i<requireActions.size();i++) {
+        Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern()
== actions.get(i).intern());
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    finally {
+      deleteDirectory(new File(chukwaRootDir));
+    }
+  }
+  
+  /**
+   * DataSink file in dataSinkDir
+   * Demux fails 3 times
+   * Add a new DataSink file to dataSinkDir
+   * Demux succeed
+   */
+  public void testScenario5() {
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    String tempDirectory = System.getProperty("test.build.data", "/tmp");
+    String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis()
+"/";
+    
+    cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
+
+    try {
+      File dataSinkDirectory = new File(chukwaRootDir +"/logs");
+      dataSinkDirectory.mkdirs();
+      File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink5-0.done");
+      dataSinkFile.createNewFile();
+      
+      DemuxManagerScenario dm = new DemuxManagerScenario5(cc);  
+      dm.start();
+ 
+      List<String> requireActions = new ArrayList<String>();
+      
+      // Move dataSink & process
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:false"); 
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
+      requireActions.add("runDemux:false"); 
+      requireActions.add("processData done"); 
+      
+      // Reprocess 1
+      requireActions.add("checkDemuxOutputDir:true"); 
+      requireActions.add("deleteDemuxOutputDir:true");
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:true"); 
+      requireActions.add("runDemux:false"); 
+      requireActions.add("processData done"); 
+      
+      // Reprocess 2
+      requireActions.add("checkDemuxOutputDir:true"); 
+      requireActions.add("deleteDemuxOutputDir:true");
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:true"); 
+      requireActions.add("runDemux:false"); 
+      requireActions.add("processData done"); 
+      
+      // Reprocess3
+      requireActions.add("checkDemuxOutputDir:true"); 
+      requireActions.add("deleteDemuxOutputDir:true");
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:true"); 
+      requireActions.add("runDemux:false"); 
+      requireActions.add("processData done"); 
+      requireActions.add("checkDemuxOutputDir:true"); 
+      requireActions.add("deleteDemuxOutputDir:true");
+      requireActions.add("checkDemuxOutputDir:false");
+      requireActions.add("checkDemuxInputDir:true");
+      requireActions.add("moveDataSinkFilesToDemuxErrorDirectory:true");
+     
+      requireActions.add("checkDemuxOutputDir:false"); 
+      requireActions.add("checkDemuxInputDir:false"); 
+      requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true"); 
+      requireActions.add("runDemux:true"); 
+      requireActions.add("checkDemuxOutputDir:true");
+      requireActions.add("moveDemuxOutputDirToPostProcessDirectory:true"); 
+      requireActions.add("moveDataSinkFilesToArchiveDirectory:true"); 
+      requireActions.add("processData done");
+      
+      
+      List<String> actions = dm.actions;
+      Assert.assertTrue(requireActions.size() == actions.size());
+      
+      for(int i=0;i<requireActions.size();i++) {
+        Assert.assertTrue( i + " - " + requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern()
== actions.get(i).intern());
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    finally {
+      deleteDirectory(new File(chukwaRootDir));
+    }
+  }
+  
+  
+  /**
+   * Standard workflow with MR_OUTPUT
+   */
+  public void testScenario6() {
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    String tempDirectory = System.getProperty("test.build.data", "/tmp");
+    String chukwaRootDir = tempDirectory + "/demuxManagerTest_" + System.currentTimeMillis()
+"/";
+    
+    
+    cc.set(CHUKWA_CONSTANT.WRITER_HDFS_FILESYSTEM_FIELD, "file:///");
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD, chukwaRootDir );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +"/archives/" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +"/postProcess" );
+    cc.set(CHUKWA_CONSTANT.CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +"/logs" );
+    
+    try {
+      
+      File dataSinkDirectory = new File(chukwaRootDir +"/logs");
+      dataSinkDirectory.mkdirs();
+      File dataSinkFile = new File(chukwaRootDir +"/logs"+ "/dataSink6.done");
+      dataSinkFile.createNewFile();
+      
+      DemuxManagerScenario dm = new DemuxManagerScenario6(cc,3);  
+      dm.start();
+      
+      List<String> requireActions = new ArrayList<String>();
+      for (int i=0;i<3;i++) {
+        requireActions.add("checkDemuxOutputDir:false");
+        requireActions.add("checkDemuxInputDir:false");
+        requireActions.add("moveDataSinkFilesToDemuxInputDirectory:true");
+        requireActions.add("runDemux:true");
+        requireActions.add("checkDemuxOutputDir:true");
+        requireActions.add("moveDemuxOutputDirToPostProcessDirectory:true");
+        requireActions.add("moveDataSinkFilesToArchiveDirectory:true");
+        requireActions.add("processData done");   
+      }
+     
+      List<String> actions = dm.actions;
+      Assert.assertTrue(requireActions.size() == actions.size());
+      
+      for(int i=0;i<requireActions.size();i++) {
+        Assert.assertTrue( requireActions.get(i) + " == " +actions.get(i),requireActions.get(i).intern()
== actions.get(i).intern());
+      }
+
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+     Assert.fail();
+    }
+    finally {
+      deleteDirectory(new File(chukwaRootDir));
+    }
+    
+  }
+  
+  
+  static public boolean deleteDirectory(File path) {
+    if( path.exists() ) {
+      File[] files = path.listFiles();
+      for(int i=0; i<files.length; i++) {
+         if(files[i].isDirectory()) {
+           deleteDirectory(files[i]);
+         }
+         else {
+           files[i].delete();
+         }
+      }
+    }
+    return( path.delete() );
+  }
+
+     /////////////////////////\
+    //// HELPER CLASSES /////  \
+   /////////////////////////____\
+  
+  class DemuxManagerScenario6 extends DemuxManagerScenario {
+    int count = 0;
+    public DemuxManagerScenario6(ChukwaConfiguration conf, int count) throws Exception {
+      super(conf);
+      this.count = count;
+    }
+    @Override
+    public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
+      boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
+      
+      try {
+        // Create DEMUX_OUTPOUT
+        String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
+        File mrOutputDir = new File(chukwaRootDir  + DEFAULT_DEMUX_PROCESSING_DIR_NAME+ DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
+        mrOutputDir.mkdirs();
+        mrOutputDir.deleteOnExit();
+        
+        // ADD DATASINK FILE
+        File dataSinkDirectory = new File(chukwaRootDir +"logs");
+        dataSinkDirectory.mkdirs();
+        File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink6-" + count + ".done");
+        dataSinkFile.createNewFile();
+        
+      }catch(Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      count --;
+      if (count <= 0) {
+        this.isRunning = false;
+      }
+     
+      return res;
+    }
+  }
+  
+  
+  class DemuxManagerScenario5 extends DemuxManagerScenario {
+    public DemuxManagerScenario5(ChukwaConfiguration conf) throws Exception {
+      super(conf);
+    }
+    
+    boolean errorDone = false;
+    public boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
+        String demuxErrorDir) throws IOException {
+      boolean res = super.moveDataSinkFilesToDemuxErrorDirectory(dataSinkDir, demuxErrorDir);
+      
+      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
+      File dataSinkDirectory = new File(chukwaRootDir +"logs");
+      dataSinkDirectory.mkdirs();
+      dataSinkDirectory.deleteOnExit();
+      File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink5-1.done");
+      dataSinkFile.createNewFile();
+      dataSinkFile.deleteOnExit();       
+  
+      errorDone = true;
+      return res;
+    }
+    
+    int counter = 0;
+    @Override
+    public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
+      if (errorDone && counter >= 4) {
+        this.isRunning = false;
+      } 
+      
+      // Create DEMUX_OUTPOUT
+      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
+      File mrOutputDir = new File(chukwaRootDir  + DEFAULT_DEMUX_PROCESSING_DIR_NAME+ DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME);
+      mrOutputDir.mkdirs();
+      mrOutputDir.deleteOnExit();
+      
+      counter ++;
+      this.actions.add("runDemux:" + errorDone);
+      return errorDone;
+    }
+  }
+  
+  class DemuxManagerScenario2 extends DemuxManagerScenario {
+    public DemuxManagerScenario2(ChukwaConfiguration conf) throws Exception {
+      super(conf);
+    }
+    
+    int counter = 0;
+    @Override
+    public boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
+        String demuxInputDir) throws IOException {
+      if (counter == 1) {
+        String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD);
+        File dataSinkDirectory = new File(chukwaRootDir +"logs");
+        dataSinkDirectory.mkdirs();
+        dataSinkDirectory.deleteOnExit();
+        File dataSinkFile = new File(chukwaRootDir +"logs"+ "/dataSink2.done");
+        dataSinkFile.createNewFile();
+        dataSinkFile.deleteOnExit();
+      }
+
+      counter ++;
+      return super.moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir);
+    }
+    
+   
+    @Override
+    public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
+      boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
+      if (counter > 1) {
+        this.isRunning = false;
+      }
+      return res;
+    }
+  }
+  
+  class DemuxManagerScenario1 extends DemuxManagerScenario {
+    int count = 0;
+    public DemuxManagerScenario1(ChukwaConfiguration conf, int count) throws Exception {
+      super(conf);
+      this.count = count;
+    }
+    @Override
+    public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
+      boolean res = super.runDemux(demuxInputDir, demuxOutputDir);
+      count --;
+      if (count <= 0) {
+        this.isRunning = false;
+      }
+     
+      return res;
+    }
+  }
+  
+  class DemuxManagerScenario extends DemuxManager {
+    public List<String>actions = new ArrayList<String>();
+ 
+    public DemuxManagerScenario(ChukwaConfiguration conf) throws Exception {
+      super(conf);
+      NO_DATASINK_SLEEP_TIME = 5;
+    }
+
+    @Override
+    public boolean checkDemuxInputDir(String demuxInputDir) throws IOException {
+      boolean res = super.checkDemuxInputDir(demuxInputDir);
+      this.actions.add("checkDemuxInputDir:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean checkDemuxOutputDir(String demuxOutputDir)
+        throws IOException {
+      boolean res = super.checkDemuxOutputDir(demuxOutputDir);
+      this.actions.add("checkDemuxOutputDir:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean moveDataSinkFilesToArchiveDirectory(String demuxInputDir,
+        String archiveDirectory) throws IOException {
+      boolean res = super.moveDataSinkFilesToArchiveDirectory(demuxInputDir,
+          archiveDirectory);
+      this.actions.add("moveDataSinkFilesToArchiveDirectory:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
+        String demuxErrorDir) throws IOException {
+      boolean res = super.moveDataSinkFilesToDemuxErrorDirectory(dataSinkDir, demuxErrorDir);
+      this.actions.add("moveDataSinkFilesToDemuxErrorDirectory:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
+        String demuxInputDir) throws IOException {
+      boolean res = super.moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir);
+      this.actions.add("moveDataSinkFilesToDemuxInputDirectory:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean moveDemuxOutputDirToPostProcessDirectory(
+        String demuxOutputDir, String postProcessDirectory) throws IOException {
+      boolean res = super.moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir,
+          postProcessDirectory);
+      this.actions.add("moveDemuxOutputDirToPostProcessDirectory:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean processData(String dataSinkDir, String demuxInputDir,
+        String demuxOutputDir, String postProcessDir, String archiveDir)
+        throws IOException {
+      boolean res = super.processData(dataSinkDir, demuxInputDir, demuxOutputDir, postProcessDir,
+          archiveDir);
+      this.actions.add("processData done");
+      return res;
+    }
+
+    @Override
+    public boolean runDemux(String demuxInputDir, String demuxOutputDir) {
+      boolean res = true;
+      this.actions.add("runDemux:" + res);
+      return res;
+    }
+
+    @Override
+    public boolean deleteDemuxOutputDir(String demuxOutputDir)
+        throws IOException {
+      boolean res = super.deleteDemuxOutputDir(demuxOutputDir);
+      this.actions.add("deleteDemuxOutputDir:" + res);
+      return res;
+    }
+    
+  }
+
+}



Mime
View raw message