chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r793678 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/extraction/ src/java/org/apache/hadoop/chukwa/extraction/archive/ src/java/org/apache/hadoop/chukwa/extraction/d...
Date Mon, 13 Jul 2009 19:33:39 GMT
Author: asrabkin
Date: Mon Jul 13 19:33:38 2009
New Revision: 793678

URL: http://svn.apache.org/viewvc?rev=793678&view=rev
Log:
CHUKWA-346. Simplified sink archiver

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/SinkArchiver.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Jul 13 19:33:38 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-346. Simplified sink archiver. (asrabkin)
+
     CHUKWA-343. Static HDFS Heatmap visualization.   (Jiaqi Tan via asrabkin)
 
     CHUKWA-342. Static swimlanes visualization widget.   (Jiaqi Tan via asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Mon Jul 13 19:33:38 2009
@@ -210,7 +210,7 @@
       checkpointDir.mkdirs();
     }
     tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
-    DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown\""));
+    DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
 
     log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
     log.info("Config - checkpointDir: [" + checkpointDir + "]");

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
Mon Jul 13 19:33:38 2009
@@ -25,6 +25,7 @@
   public static final String CHUKWA_ROOT_REPOS_DIR_FIELD = "chukwaRootReposDir";
 
   
+  //This is the INPUT directory for archiving; defaults to /chukwa/logs
   public static final String CHUKWA_ARCHIVE_DIR_FIELD = "chukwaArchiveDir";
   public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir";
   public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir";
@@ -37,7 +38,7 @@
   
   public static final String CHUKWA_DEMUX_REDUCER_COUNT_FIELD     = "demux.reducerCount";
   
-  public static final String DEFAULT_DEMUX_ROOT_DIR_NAME          = "/chukwa/";
+  public static final String DEFAULT_CHUKWA_ROOT_DIR_NAME          = "/chukwa/";
   public static final String DEFAULT_REPOS_DIR_NAME               = "repos/";
   public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME  = "postProcess/";
   public static final String DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME = "postProcessInError/";
@@ -51,10 +52,12 @@
   public static final String DEFAULT_CHUKWA_DATASINK_DIR_NAME     = "dataSinkArchives/";
   public static final String DEFAULT_FINAL_ARCHIVES               = "finalArchives/";
   
-  public static final String DEFAULT_ARCHIVES_PROCESSING_DIR_NAME    = "archivesProcessing/";
-  public static final String DEFAULT_ARCHIVES_MR_OUTPUT_DIR_NAME     = "mrOutput/";
-  public static final String DEFAULT_ARCHIVES_MR_INPUT_DIR_NAME      = "mrInput/";
-  public static final String DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME      = "inError/";
+    //These fields control the working dirs for the archive mapred job.
+    //They are not configurable at runtime.
+  public static final String ARCHIVES_PROCESSING_DIR_NAME    = "archivesProcessing/";
+  public static final String ARCHIVES_MR_OUTPUT_DIR_NAME     = "mrOutput/";
+  public static final String ARCHIVES_MR_INPUT_DIR_NAME      = "mrInput/";
+  public static final String ARCHIVES_IN_ERROR_DIR_NAME      = "inError/";
 
   public static final String POST_DEMUX_DATA_LOADER = "chukwa.post.demux.data.loader";  
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
Mon Jul 13 19:33:38 2009
@@ -19,9 +19,13 @@
 package org.apache.hadoop.chukwa.extraction.archive;
 
 
+import java.io.IOException;
+import java.util.Iterator;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -29,6 +33,10 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -46,6 +54,23 @@
  *
  */
 public class ChukwaArchiveBuilder extends Configured implements Tool {
+  
+  
+  static class UniqueKeyReduce extends MapReduceBase implements
+  Reducer<ChukwaArchiveKey, ChunkImpl, ChukwaArchiveKey, ChunkImpl> {
+
+    /**
+     * Outputs exactly one value for each key; this suppresses duplicates
+     */
+    @Override
+    public void reduce(ChukwaArchiveKey key, Iterator<ChunkImpl> vals,
+        OutputCollector<ChukwaArchiveKey, ChunkImpl> out, Reporter r)
+        throws IOException {
+      ChunkImpl i = vals.next();
+      out.collect(key, i);
+    }
+  
+  }
   static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
 
   static int printUsage() {
@@ -68,7 +93,9 @@
     jobConf.setInputFormat(SequenceFileInputFormat.class);
 
     jobConf.setMapperClass(IdentityMapper.class);
-    jobConf.setReducerClass(IdentityReducer.class);
+    
+    jobConf.setReducerClass(UniqueKeyReduce.class);
+//    jobConf.setReducerClass(IdentityReducer.class);
 
     if (args[0].equalsIgnoreCase("Daily")) {
       jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
Mon Jul 13 19:33:38 2009
@@ -72,7 +72,7 @@
   
   public void start() throws Exception {
     
-    String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_DEMUX_ROOT_DIR_NAME);
+    String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
     if ( ! chukwaRootDir.endsWith("/") ) {
       chukwaRootDir += "/";
     }
@@ -86,10 +86,10 @@
     Path pArchiveRootDir = new Path(archiveRootDir);
     setup(pArchiveRootDir);
     
-    String archivesRootProcessingDir = chukwaRootDir + DEFAULT_ARCHIVES_PROCESSING_DIR_NAME;
+    String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
     // String archivesErrorDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME;
-    String archivesMRInputDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_MR_INPUT_DIR_NAME;
-    String archivesMROutputDir = archivesRootProcessingDir+ DEFAULT_ARCHIVES_MR_OUTPUT_DIR_NAME;
+    String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
+    String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
     String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
 
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
Mon Jul 13 19:33:38 2009
@@ -1,6 +0,0 @@
-package org.apache.hadoop.chukwa.extraction.archive;
-
-
-public class ChukwaArchiveMerger {
-
-}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/SinkArchiver.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/SinkArchiver.java?rev=793678&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/SinkArchiver.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/SinkArchiver.java
Mon Jul 13 19:33:38 2009
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+
+/**
+ * A lightweight tool for archiving, suitable for small-to-medium-size Chukwa
+ * deployments that don't use Demux.
+ * Grabs everything in the data sink, runs the Archiver MapReduce job,
+ * then promotes output to the archive dir.
+ * 
+ * Input is determined by conf option chukwaArchiveDir; defaults to
+ *   /chukwa/logs
+ *   
+ *   Uses /chukwa/archivesProcessing/mr[Input/Output] as tmp storage
+ *   
+ *   Outputs to /chukwa/archives
+ * 
+ */
+public class SinkArchiver implements CHUKWA_CONSTANT {
+  
+  final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
+    public boolean accept(Path file) {
+      return file.getName().endsWith(".done");
+    }     
+  };
+  
+  static Logger log = Logger.getLogger(SinkArchiver.class);
+  
+  public static void main(String[] args) {
+    try {
+      Configuration conf = new ChukwaConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      SinkArchiver archiver = new SinkArchiver();
+      archiver.exec(fs, conf);    
+    } catch(IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+
+  /*
+   * Pull most of the logic into instance methods so that we can
+   * more easily unit-test, by altering passed-in configuration.
+   */
+  public void exec(FileSystem fs, Configuration conf) {
+    try {
+      
+      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
+      if ( ! chukwaRootDir.endsWith("/") ) {
+        chukwaRootDir += "/";
+      }
+      String archiveSource = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
+      if ( ! archiveSource.endsWith("/") ) {
+        archiveSource += "/";
+      }
+      String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
+      
+      //String archivesErrorDir = archivesRootProcessingDir + ARCHIVES_IN_ERROR_DIR_NAME;
+      String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
+      String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
+
+      Path pSource = new Path(archiveSource);
+      
+      Path pMRInputDir = new Path(archivesMRInputDir);
+      if(!fs.exists(pMRInputDir))
+        fs.mkdirs(pMRInputDir);
+      
+      Path pOutputDir = new Path(archivesMROutputDir);
+      
+      Path archive = new Path(chukwaRootDir + "archive");
+      
+      selectInputs(fs, pSource, pMRInputDir);
+
+      int result = runMapRedJob(conf, archivesMRInputDir, archivesMROutputDir);
+      if(result == 0) { //success, so empty input dir
+        fs.delete(pMRInputDir, true);
+      }
+      
+      FileStatus[] files = fs.listStatus(pOutputDir);
+      for(FileStatus f: files) {
+        if(!f.getPath().getName().endsWith("_logs"))
+          promoteAndMerge(fs, f.getPath(), archive);
+      }
+
+      fs.delete(pOutputDir, true);
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  private void selectInputs(FileSystem fs, Path pSource,
+      Path pMRInputDir) throws IOException {
+    
+    FileStatus[] dataSinkFiles = fs.listStatus(pSource, DATA_SINK_FILTER);
+    for(FileStatus fstatus: dataSinkFiles) {
+      boolean rename = fs.rename(fstatus.getPath(),pMRInputDir);
+      log.info("Moving " + fstatus.getPath() + " to " + pMRInputDir 
+          +", status is: " + rename);
+    }
+    
+  }
+
+  public int runMapRedJob(Configuration conf, String in, String out)
+    throws Exception {
+    String grouper = conf.get("archive.grouper","DataType");
+    String[] args = new String[] {grouper, in, out};
+    int res = ToolRunner.run(conf, new ChukwaArchiveBuilder(),
+        args);
+    return res;
+  }
+  
+  public void promoteAndMerge(FileSystem fs, Path src, Path dest) 
+  throws IOException {
+    FileStatus stat = fs.getFileStatus(src);
+    String baseName = src.getName();
+    Path target = new Path(dest, baseName);
+    if(!fs.exists(target)) {
+      fs.rename(src, dest);
+      System.out.println("moving " + src + " to " + dest);
+    } else if(stat.isDir()) {//recurse
+      FileStatus[] files = fs.listStatus(src);
+      for(FileStatus f: files) {
+        promoteAndMerge(fs, f.getPath(), target);
+      }
+    } else { //append a number to unique-ify filename
+      int i=0;
+      do {
+        //FIXME: can make this more generic
+        String newName;
+        if(baseName.endsWith(".arc")) {
+          newName = baseName.substring(0, baseName.length() - 4) + "-"+i+".arc";
+        }
+        else
+          newName = baseName+"-"+i;
+        target = new Path(dest, newName);
+      } while(fs.exists(target));
+      fs.rename(src, target);
+      System.out.println("promoting " + src + " to " + target);
+    }
+
+  }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
Mon Jul 13 19:33:38 2009
@@ -99,7 +99,7 @@
    */
   public void start() throws Exception {
 
-     String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_DEMUX_ROOT_DIR_NAME);
+     String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
      if ( ! chukwaRootDir.endsWith("/") ) {
        chukwaRootDir += "/";
      }
@@ -325,6 +325,7 @@
        return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
      } catch (Throwable e) {
        e.printStackTrace();
+       log.error("failed to run demux", e);
        globalErrorcounter ++;
      }
      return false;
@@ -343,8 +344,8 @@
        String dataSinkDir, String demuxInputDir) throws IOException {
      Path pDataSinkDir = new Path(dataSinkDir);
      Path pDemuxInputDir = new Path(demuxInputDir);
-     log.info("dataSinkDir" + dataSinkDir);
-     log.info("demuxInputDir" + demuxInputDir);
+     log.info("dataSinkDir: " + dataSinkDir);
+     log.info("demuxInputDir: " + demuxInputDir);
 
 
      boolean containsFile = false;

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
Mon Jul 13 19:33:38 2009
@@ -25,7 +25,8 @@
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
 import junit.framework.TestCase;
-
+//Note this test takes a minimum of 
+// 20 * 2 + 6* 20 = 160 seconds. 
 public class TestAgent extends TestCase {
 
   public void testStopAndStart() {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java?rev=793678&r1=793677&r2=793678&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
Mon Jul 13 19:33:38 2009
@@ -1,10 +1,20 @@
 /*
- * Copyright (C) The Apache Software Foundation. All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * This software is published under the terms of the Apache Software
- * License version 1.1, a copy of which has been included with this
- * distribution in the LICENSE.txt file.  */
-
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.extraction.archive;
 
 import java.io.IOException;
@@ -43,6 +53,7 @@
        System.out.println( p.getName() );
    }
   
+   long lastSeqID = 0;
   public ChunkImpl getARandomChunk() {
     int ms = r.nextInt(1000);
     String line = "2008-05-29 10:42:22," + ms
@@ -50,7 +61,8 @@
         + r.nextInt() + "\n";
 
     ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test",
-        line.length() - 1L, line.getBytes(), null);
+        line.length() - 1L + lastSeqID, line.getBytes(), null);
+    lastSeqID += line.length();
     c.addTag("cluster=\"foocluster\"");
     return c;
   }
@@ -87,55 +99,96 @@
   static final int NUM_HADOOP_SLAVES = 1;
   static final Path DATASINK = new Path("/chukwa/logs/*");
   static final Path DATASINKFILE = new Path("/chukwa/logs/foobar.done");
-  static final Path OUTPUT_DIR = new Path("/chukwa/archives/");
+  static final Path DATASINK_NOTDONE = new Path("/chukwa/logs/foo.chukwa");
+  static final Path DEST_FILE = new Path("/chukwa/archive/foocluster/HadoopLogProcessor_2008_05_29.arc");
+  static final Path MERGED_DATASINK = new Path("/chukwa/archive/foocluster/HadoopLogProcessor_2008_05_29-0.arc");
+  static final Path OUTPUT_DIR = new Path("/chukwa/archive/");
+  static final int CHUNKCOUNT = 1000;
+  
+
   
   /**
    * Writes a single chunk to a file, checks that archiver delivers it
    * to an archive file with correct filename.
    */
   public void testArchiving() throws Exception {
+    FileSystem fileSys;
+    MiniMRCluster mr;
+    JobConf jc ;
     
     System.out.println("starting archive test");
     Configuration conf = new Configuration();
-    System.setProperty("hadoop.log.dir", System.getProperty(
-        "test.build.data", "/tmp"));
-    MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
-        null);
-    FileSystem fileSys = dfs.getFileSystem();
-    fileSys.delete(OUTPUT_DIR, true);//nuke output dir
-
-    writeASinkFile(conf, fileSys, DATASINKFILE, 1000);
     
-    FileStatus fstat = fileSys.getFileStatus(DATASINKFILE);
-    assertTrue(fstat.getLen() > 10);
-    
-    System.out.println("filesystem is " + fileSys.getUri());
-    conf.set("fs.default.name", fileSys.getUri().toString());
     conf.setInt("io.sort.mb", 1);
     conf.setInt("io.sort.factor", 5);
     conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
     conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
     conf.set("archive.groupByClusterName", "true");
     
-    MiniMRCluster mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
+    System.setProperty("hadoop.log.dir", System.getProperty(
+        "test.build.data", "/tmp"));
+
+    MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
+        null);
+    fileSys = dfs.getFileSystem();
+    conf.set("fs.default.name", fileSys.getUri().toString());
+    
+    System.out.println("filesystem is " + fileSys.getUri());
+
+    
+    mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
         .toString(), 1);
+    jc = mr.createJobConf(new JobConf(conf));
+   
+
+    fileSys.delete(new Path("/chukwa"), true);//nuke sink
+
+    writeASinkFile(jc, fileSys, DATASINKFILE, CHUNKCOUNT);
+    
+    FileStatus fstat = fileSys.getFileStatus(DATASINKFILE);
+    long dataLen = fstat.getLen();
+    assertTrue(dataLen > CHUNKCOUNT * 50);
+    
     String[] archiveArgs = {"DataType", fileSys.getUri().toString() + DATASINK.toString(),
         fileSys.getUri().toString() +OUTPUT_DIR.toString() };
     
-    JobConf jc = mr.createJobConf(new JobConf(conf));
     assertEquals("true", jc.get("archive.groupByClusterName"));
     assertEquals(1, jc.getInt("io.sort.mb", 5));
     
     int returnVal = ToolRunner.run(jc,  new ChukwaArchiveBuilder(), archiveArgs);
     assertEquals(0, returnVal);
-    fstat = fileSys.getFileStatus(new Path("/chukwa/archives/foocluster/HadoopLogProcessor_2008_05_29.arc"));
-    assertTrue(fstat.getLen() > 10);    
+    fstat = fileSys.getFileStatus(DEST_FILE);
+    assertEquals(dataLen, fstat.getLen());    
     
     Thread.sleep(1000);
-    browseDir(fileSys, new Path("/"), 0);    //OUTPUT_DIR, 0);
-    System.out.println("done!");
     
+    SinkArchiver a = new SinkArchiver();
+    fileSys.delete(new Path("/chukwa"), true);
+
+    writeASinkFile(jc, fileSys, DATASINKFILE, CHUNKCOUNT);
+    writeASinkFile(jc, fileSys, DATASINK_NOTDONE, 50);
+    writeASinkFile(jc, fileSys, DEST_FILE, 10);
+    
+    long doneLen = fileSys.getFileStatus(DATASINKFILE).getLen();
+    long notDoneLen = fileSys.getFileStatus(DATASINK_NOTDONE).getLen();
+    long archFileLen = fileSys.getFileStatus(DEST_FILE).getLen();
+
+    //we now have three files: one closed datasink, one "unfinished" datasink,
+    //and one archived.  After merge, should have two datasink files,
+    //plus the "unfinished" datasink
+    
+    a.exec(fileSys, jc);
+
+    browseDir(fileSys, new Path("/"), 0);    //OUTPUT_DIR, 0);
     
+      //make sure we don't scramble anything
+    assertEquals(notDoneLen, fileSys.getFileStatus(DATASINK_NOTDONE).getLen());
+    assertEquals(archFileLen, fileSys.getFileStatus(DEST_FILE).getLen());
+    //and make sure promotion worked right
+
+    assertEquals(doneLen, fileSys.getFileStatus(MERGED_DATASINK).getLen());
+    mr.shutdown();
+    dfs.shutdown();
     
   }
   



Mime
View raw message