chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r902715 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/extraction/demux/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/
Date Mon, 25 Jan 2010 05:41:09 GMT
Author: eyang
Date: Mon Jan 25 05:41:09 2010
New Revision: 902715

URL: http://svn.apache.org/viewvc?rev=902715&view=rev
Log:
CHUKWA-440. Enable addon jar file for Demux from Distributed Cache. (Eric Yang)

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=902715&r1=902714&r2=902715&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Jan 25 05:41:09 2010
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-440. Enable addon jar file for Demux from Distributed Cache. (Eric Yang)
+
     CHUKWA-448. Write-ahead buffering for arbitrary adaptors. (asrabkin)
 
     CHUKWA-441. Added real time Hadoop activity monitor. (Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=902715&r1=902714&r2=902715&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Mon
Jan 25 05:41:09 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.chukwa.extraction.demux;
 
 
+import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -34,8 +35,12 @@
 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -145,6 +150,21 @@
     return -1;
   }
 
+  public static void addParsers(Configuration conf) {
+    String parserPath = conf.get("chukwa.data.dir")+File.separator+"demux";
+    try {
+      FileSystem fs = FileSystem.get(new Configuration());
+      FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
+      if(fstatus!=null) {
+        for(FileStatus parser : fstatus) {
+          DistributedCache.addFileToClassPath(parser.getPath(), conf);
+        }
+      }
+    } catch (IOException e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+    }
+  }
+  
   public int run(String[] args) throws Exception {
     JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
     
@@ -159,7 +179,8 @@
     conf.setOutputValueClass(ChukwaRecord.class);
     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
     conf.setJobPriority(JobPriority.VERY_HIGH);
-
+    addParsers(conf);
+    
     List<String> other_args = new ArrayList<String>();
     for (int i = 0; i < args.length; ++i) {
       try {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java?rev=902715&r1=902714&r2=902715&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
Mon Jan 25 05:41:09 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -25,8 +26,11 @@
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.NagiosHelper;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -53,6 +57,7 @@
   
   protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
   protected volatile boolean isRunning = true;
+  private final static String demuxPath = System.getenv("CHUKWA_HOME")+File.separator+"lib"+File.separator+"demux";
 
   final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
     public boolean accept(Path file) {
@@ -314,13 +319,14 @@
     * @return true id Demux succeed
     */
    protected boolean runDemux(String demuxInputDir, String demuxOutputDir) {
-     String[] demuxParams = new String[4];
-     demuxParams[0] = "-r";
-     demuxParams[1] = "" + demuxReducerCount;
-
-     demuxParams[2] = demuxInputDir;
-     demuxParams[3] = demuxOutputDir;
-
+     String[] demuxParams;
+     int i=0;
+     Demux.addParsers(conf);
+     demuxParams = new String[4];
+     demuxParams[i++] = "-r";
+     demuxParams[i++] = "" + demuxReducerCount;
+     demuxParams[i++] = demuxInputDir;
+     demuxParams[i++] = demuxOutputDir;
      try {
        return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
      } catch (Throwable e) {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java?rev=902715&r1=902714&r2=902715&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java
Mon Jan 25 05:41:09 2010
@@ -27,7 +27,7 @@
 
   private static HashMap<String, MapProcessor> processors = new HashMap<String,
MapProcessor>(); // registry
 
-  private MapProcessorFactory() {
+  public MapProcessorFactory() {
   }
 
   public static MapProcessor getProcessor(String parserClass)



Mime
View raw message