chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r789469 - in /hadoop/chukwa/trunk: ./ conf/ src/java/org/apache/hadoop/chukwa/extraction/archive/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ src/test/org/apache/hadoop/chukwa/datacollection/writer/ src/test/org/apa...
Date Mon, 29 Jun 2009 22:15:37 GMT
Author: asrabkin
Date: Mon Jun 29 22:15:37 2009
New Revision: 789469

URL: http://svn.apache.org/viewvc?rev=789469&view=rev
Log:
CHUKWA-25. Archiver can group by cluster name

Added:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Jun 29 22:15:37 2009
@@ -34,6 +34,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-25.  Archiver can group by cluster name. (asrabkin)
+    
     CHUKWA-308. Added capability to start hicc without tomcat. (Eric Yang)
 
     CHUKWA-326. If the chukwa records produce errors in post process, move to InErrorDirectory.
(Jerome Boulon via Eric Yang)

Modified: hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template (original)
+++ hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template Mon Jun 29 22:15:37 2009
@@ -27,7 +27,19 @@
 	  files.  This determines the number of open file handles.</description>
 	</property>
 
-<!-- -->
+<!-- Archive configuration -->
+  
+    <property>
+       <name>archive.grouper</name>
+       <value>Stream</value>
+       <description>How to group archive files. Choices are Hourly, Daily, DataType,
and Stream.</description>
+    </property>
+
+    <property>
+       <name>archive.groupByClusterName</name>
+       <value>false</value>
+       <description>Whether the DataType grouper should group archived files by cluster
name.</description>
+    </property>
 
 <!-- PostProcessorManager config -->
   <property>

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
Mon Jun 29 22:15:37 2009
@@ -36,12 +36,21 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
+/**
+ * Main class for mapreduce job to do archiving of Chunks.
+ * 
+ * Map class and reduce classes are both identity; actual logic is in 
+ * Partitioner and OutputFormat classes.  Those are selected by first argument.
+ * 
+ * 
+ *
+ */
 public class ChukwaArchiveBuilder extends Configured implements Tool {
   static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
 
   static int printUsage() {
     System.out
-        .println("ChuckwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input>
<output>");
+        .println("ChukwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input>
<output>");
     ToolRunner.printGenericCommandUsage(System.out);
     return -1;
   }
@@ -54,8 +63,7 @@
           + " instead of 3.");
       return printUsage();
     }
-
-    JobConf jobConf = new JobConf(new ChukwaConfiguration(), ChukwaArchiveBuilder.class);
+    JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
 
     jobConf.setInputFormat(SequenceFileInputFormat.class);
 
@@ -71,7 +79,7 @@
       jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
       jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
     } else if (args[0].equalsIgnoreCase("DataType")) {
-      jobConf.setJobName("Chukwa-HourlyArchiveBuilder-DataType");
+      jobConf.setJobName("Chukwa-ArchiveBuilder-DataType");
       int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
       log.info("Reduce Count:" + reduceCount);
       jobConf.setNumReduceTasks(reduceCount);
@@ -103,7 +111,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new ChukwaArchiveBuilder(),
+    int res = ToolRunner.run(new ChukwaConfiguration(), new ChukwaArchiveBuilder(),
         args);
     System.exit(res);
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
Mon Jun 29 22:15:37 2009
@@ -22,14 +22,31 @@
 import java.text.SimpleDateFormat;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
 public class ChukwaArchiveDataTypeOutputFormat extends
     MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
   static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
   SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+  boolean useClusterID;
+  
+  public RecordWriter<ChukwaArchiveKey,ChunkImpl>  getRecordWriter(FileSystem fs,
+      JobConf job, String name, Progressable arg3) 
+  throws java.io.IOException{
 
+    log.info("archive.addClusterName is " + job.get("archive.groupByClusterName"));
+    useClusterID = "true".equals(job.get("archive.groupByClusterName"));
+
+    return super.getRecordWriter(fs, job, name, arg3);
+  }
+  
   @Override
   protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
       ChunkImpl chunk, String name) {
@@ -39,7 +56,12 @@
           + sdf.format(key.getTimePartition()));
     }
 
-    return chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
+    if(useClusterID) {
+      String clusterID = RecordUtil.getClusterName(chunk);
+      return clusterID + "/" + chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
+      + ".arc";
+    } else
+      return chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
         + ".arc";
   }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
Mon Jun 29 22:15:37 2009
@@ -22,6 +22,7 @@
 import java.text.SimpleDateFormat;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 
@@ -29,13 +30,21 @@
     Partitioner<ChukwaArchiveKey, ChunkImpl> {
   SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
 
-  public void configure(JobConf arg0) {
+  boolean useClusterID = false;
+  public void configure(JobConf conf) {
+    useClusterID = "true".equals(conf.get("archive.groupByClusterName"));
   }
 
   public int getPartition(ChukwaArchiveKey key, ChunkImpl chunk,
       int numReduceTasks) {
-
-    return ((chunk.getDataType() + "_" + sdf.format(key.getTimePartition()))
+    
+    if(useClusterID) {
+      String clusterID = RecordUtil.getClusterName(chunk);
+      return ((chunk.getDataType() + "_" + clusterID + "_" + sdf.format(key.getTimePartition()))
+          .hashCode() & Integer.MAX_VALUE)
+          % numReduceTasks;
+    } else
+      return ((chunk.getDataType() + "_" + sdf.format(key.getTimePartition()))
         .hashCode() & Integer.MAX_VALUE)
         % numReduceTasks;
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
Mon Jun 29 22:15:37 2009
@@ -210,7 +210,9 @@
   public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
       String finalArchiveOutput) throws Exception {
     String[] args = new String[3];
-    args[0] = "Stream";
+    
+    
+    args[0] = conf.get("archive.grouper","Stream");
     args[1] = archivesMRInputDir + "*/*.done" ;
     args[2] = archivesMROutputDir;
     

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
Mon Jun 29 22:15:37 2009
@@ -39,9 +39,9 @@
   int currentPos = 0;
   int startOffset = 0;
 
-  ChukwaArchiveKey archiveKey = null;
-  ChukwaRecordKey key = new ChukwaRecordKey();
-  Chunk chunk = null;
+  protected ChukwaArchiveKey archiveKey = null;
+  protected ChukwaRecordKey key = new ChukwaRecordKey();
+  protected Chunk chunk = null;
 
   boolean chunkInErrorSaved = false;
   OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=789469&r1=789468&r2=789469&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
Mon Jun 29 22:15:37 2009
@@ -83,7 +83,7 @@
       
       Assert.assertFalse(seqWriterFile == null);
       
-      String seqWriterDump = dumpArachive(fs,conf,seqWriterFile);
+      String seqWriterDump = dumpArchive(fs,conf,seqWriterFile);
       
       Configuration confLocalWriter = new Configuration();
       confSeqWriter.set("writer.hdfs.filesystem", "file:///");
@@ -110,7 +110,7 @@
       }
       
       Assert.assertFalse(localWriterFile == null);
-      String localWriterDump = dumpArachive(fs,conf,localWriterFile);
+      String localWriterDump = dumpArchive(fs,conf,localWriterFile);
 
       Assert.assertTrue(seqWriterDump.intern() == localWriterDump.intern());
 
@@ -123,7 +123,7 @@
     
   }
   
-  protected String dumpArachive(FileSystem fs,Configuration conf, String file) throws Throwable
{
+  protected String dumpArchive(FileSystem fs,Configuration conf, String file) throws Throwable
{
     SequenceFile.Reader reader = null;
     try {
       reader = new SequenceFile.Reader(fs, new Path(file), conf);

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java?rev=789469&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/archive/TestArchive.java
Mon Jun 29 22:15:37 2009
@@ -0,0 +1,142 @@
+/*
+ * Copyright (C) The Apache Software Foundation. All rights reserved.
+ *
+ * This software is published under the terms of the Apache Software
+ * License version 1.1, a copy of which has been included with this
+ * distribution in the LICENSE.txt file.  */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import java.io.IOException;
+import java.util.Calendar;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ToolRunner;
+import junit.framework.TestCase;
+
+public class TestArchive extends TestCase {
+
+  java.util.Random r = new java.util.Random();
+  
+   public void browseDir(FileSystem fs, Path p, int d) throws IOException {
+     for(int i=0; i< d; ++i) {
+       System.out.print(" |");
+     }
+     FileStatus stat = fs.getFileStatus(p);
+     if(stat.isDir()) {
+       System.out.println(" \\ " + p.getName());
+       FileStatus[] files = fs.listStatus(p);
+       for(FileStatus f: files) {
+         browseDir(fs, f.getPath(), d+1);
+       }
+     }
+     else
+       System.out.println( p.getName() );
+   }
+  
+  public ChunkImpl getARandomChunk() {
+    int ms = r.nextInt(1000);
+    String line = "2008-05-29 10:42:22," + ms
+        + " INFO org.apache.hadoop.dfs.DataNode: Some text goes here"
+        + r.nextInt() + "\n";
+
+    ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test",
+        line.length() - 1L, line.getBytes(), null);
+    c.addTag("cluster=\"foocluster\"");
+    return c;
+  }
+  
+  public void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
+      int chunks) throws IOException {
+    FSDataOutputStream out = fileSys.create(dest);
+
+    Calendar calendar = Calendar.getInstance();
+    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
+        ChukwaArchiveKey.class, ChunkImpl.class,
+        SequenceFile.CompressionType.NONE, null);
+    for (int i = 0; i < chunks; ++i) {
+      ChunkImpl chunk = getARandomChunk();
+      ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+      calendar.set(Calendar.YEAR, 2008);
+      calendar.set(Calendar.MONTH, Calendar.MAY);
+      calendar.set(Calendar.DAY_OF_MONTH, 29);
+      calendar.set(Calendar.HOUR, 10);
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      archiveKey.setTimePartition(calendar.getTimeInMillis());
+      archiveKey.setDataType(chunk.getDataType());
+      archiveKey.setStreamName(chunk.getStreamName());
+      archiveKey.setSeqId(chunk.getSeqID());
+      seqFileWriter.append(archiveKey, chunk);
+    }
+    seqFileWriter.close();
+    out.close();
+  }
+
+  static final int NUM_HADOOP_SLAVES = 1;
+  static final Path DATASINK = new Path("/chukwa/logs/*");
+  static final Path DATASINKFILE = new Path("/chukwa/logs/foobar.done");
+  static final Path OUTPUT_DIR = new Path("/chukwa/archives/");
+  
+  /**
+   * Writes a single chunk to a file, checks that archiver delivers it
+   * to an archive file with correct filename.
+   */
+  public void testArchiving() throws Exception {
+    
+    System.out.println("starting archive test");
+    Configuration conf = new Configuration();
+    System.setProperty("hadoop.log.dir", System.getProperty(
+        "test.build.data", "/tmp"));
+    MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
+        null);
+    FileSystem fileSys = dfs.getFileSystem();
+    fileSys.delete(OUTPUT_DIR, true);//nuke output dir
+
+    writeASinkFile(conf, fileSys, DATASINKFILE, 1000);
+    
+    FileStatus fstat = fileSys.getFileStatus(DATASINKFILE);
+    assertTrue(fstat.getLen() > 10);
+    
+    System.out.println("filesystem is " + fileSys.getUri());
+    conf.set("fs.default.name", fileSys.getUri().toString());
+    conf.setInt("io.sort.mb", 1);
+    conf.setInt("io.sort.factor", 5);
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
+    conf.set("archive.addClusterName", "true");
+    
+    MiniMRCluster mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
+        .toString(), 1);
+    String[] archiveArgs = {"DataType", fileSys.getUri().toString() + DATASINK.toString(),
+        fileSys.getUri().toString() +OUTPUT_DIR.toString() };
+    
+    JobConf jc = mr.createJobConf(new JobConf(conf));
+    assertEquals("true", jc.get("archive.groupByClusterName"));
+    assertEquals(1, jc.getInt("io.sort.mb", 5));
+    
+    int returnVal = ToolRunner.run(jc,  new ChukwaArchiveBuilder(), archiveArgs);
+    assertEquals(0, returnVal);
+    fstat = fileSys.getFileStatus(new Path("/chukwa/archives/foocluster/HadoopLogProcessor_2008_05_29.arc"));
+    assertTrue(fstat.getLen() > 10);    
+    
+    Thread.sleep(1000);
+    browseDir(fileSys, new Path("/"), 0);    //OUTPUT_DIR, 0);
+    System.out.println("done!");
+    
+    
+    
+  }
+  
+}



Mime
View raw message