chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r805891 - in /hadoop/chukwa/trunk: ./ contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/ contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/
Date Wed, 19 Aug 2009 17:15:49 GMT
Author: asrabkin
Date: Wed Aug 19 17:15:49 2009
New Revision: 805891

URL: http://svn.apache.org/viewvc?rev=805891&view=rev
Log:
CHUKWA-377. Fix xtrace code in contrib

Added:
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/CausalGraph.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=805891&r1=805890&r2=805891&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Aug 19 17:15:49 2009
@@ -122,6 +122,8 @@
 
   BUG FIXES
 
+    CHUKWA-377. Revised xtrace adaptor code. (asrabkin)
+
     CHUKWA-372. select_multiple_callback does not work in hicc (Eric Yang via asrabkin)
 
     CHUKWA-354. Fix corner case in SinkArchiver. (asrabkin)

Added: hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/CausalGraph.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/CausalGraph.java?rev=805891&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/CausalGraph.java
(added)
+++ hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/CausalGraph.java
Wed Aug 19 17:15:49 2009
@@ -0,0 +1,280 @@
+package edu.berkeley.chukwa_xtrace;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.File;
+import java.util.*;
+import org.apache.hadoop.io.Text;
+import edu.berkeley.chukwa_xtrace.XtrExtract.PtrReverse;
+import edu.berkeley.xtrace.reporting.Report;
+
+/**
+ * Encapsulates a causal graph; nodes are xtrace reports.
+ *
+ */
+public class CausalGraph implements Iterable<Report> {
+  
+  /**
+   * Returns the distance from report src to dest.
+   * Should be positive if dest happened after src
+   * @author asrabkin
+   *
+   */
+  public static class RDistMetric {
+    long dist(Report src, Report dest) {
+      return getTS(src);
+    }
+  }
+  
+  public static class IgnoreReportsMetric extends RDistMetric {
+    List<Report> pathToIgnore;
+    /**
+     * Path should be reverse-ordered, same as longestPath returns
+     * @param pathToIgnore
+     */
+    public IgnoreReportsMetric(List<Report> pathToIgnore) {
+      this.pathToIgnore = pathToIgnore;
+    }
+    
+    @Override
+    long dist(Report src, Report dest) {
+      for(int i =0; i < pathToIgnore.size()-1; ++i) {
+        if((pathToIgnore.get(i+1) == src) && (pathToIgnore.get(i) == dest))
+          return 0;
+      }
+      return getTS(src);
+    }
+    
+  }
+  
+  
+  private Map<String, Report> reports;
+  private Report start;
+  private Report end;
+  
+  public CausalGraph(Map<String, Report> reports) {
+    this.reports = reports;
+  }
+  
+  public CausalGraph() {
+    reports = new LinkedHashMap<String, Report>();
+  }
+  
+  
+  public Report getStart() {
+    return start;
+  }
+
+  public void setStart(Report start) {
+    this.start = start;
+  }
+
+  public Report getEnd() {
+    return end;
+  }
+
+  public void setEnd(Report end) {
+    this.end = end;
+  }
+
+  public void add(Report r) {
+    String opID = r.getMetadata().getOpIdString();
+    reports.put(opID, r);
+  }
+  
+  
+  ///////  Graph-analytic functions
+  
+  public Set<Report> predecessors(Report p) {
+    
+    HashSet<Report> predecessors = new HashSet<Report>();
+    Queue<Report> bfsQ = new LinkedList<Report>();
+    bfsQ.add(p);
+    while(!bfsQ.isEmpty()) {
+      Report r = bfsQ.remove();
+
+      assert r!= null;
+      predecessors.add(r);
+      List<String> backEdges = r.get("Edge");
+      if(backEdges != null)
+        for(String pred:backEdges) {
+          Report pre = reports.get(pred);
+          if(pre != null)
+            bfsQ.add(pre);
+        }
+    }
+    
+    return predecessors;
+  }
+  
+  public List<Report> topoSort(PtrReverse reverser) {
+    HashMap<String, Integer> counts = new HashMap<String, Integer>();
+    Queue<Report> zeroInlinkReports = new LinkedList<Report>();
+
+    //FIXME: could usefully compare reports.size() with numReports;
+    //that would measure duplicate reports
+    
+    //increment link counts for children
+    for(Report r: reports.values()){ 
+      String myOpID = r.getMetadata().getOpIdString();
+      
+      int parentCount = reverser.setupForwardPointers(reports, r, myOpID);
+        
+      //if there weren't any parents, we can dequeue
+      if(parentCount == 0)
+        zeroInlinkReports.add(r);
+      else
+        counts.put(myOpID, parentCount);
+    }
+    
+    //at this point, we have a map from metadata to report, and also
+    //from report op ID to inlink count.
+    //next step is to do a topological sort.
+
+    ArrayList<Report> finalOutput = new ArrayList<Report>();
+    while(!zeroInlinkReports.isEmpty()) {
+      Report r = zeroInlinkReports.remove();
+      
+      List<String> outLinks =  r.get(XtrExtract.OUTLINK_FIELD);
+      if(outLinks != null) {
+        for(String outLink: outLinks) {
+          Integer oldCount = counts.get(outLink);
+          if(oldCount == null) {
+            oldCount = 0;  //FIXME: can this happen?
+              //Means we have a forward-edge to a node which we haven't ever set up a link
count for
+      //      log.warn(taskIDString+": found an in-edge where none was expected");
+          } if(oldCount == 1) {
+            zeroInlinkReports.add(reports.get(outLink));
+          }
+          counts.put(outLink, oldCount -1);
+        }
+      }
+    }
+    return finalOutput;
+  }
+  
+  
+  
+  ///////  Performance-analytic functions
+  
+  private static final long getTS(Report r) {
+    List<String> staTL = r.get("Timestamp");
+    if(staTL != null && staTL.size() > 0) {
+
+      double t = Double.parseDouble(staTL.get(0));
+      return Math.round(1000 * t);
+    }
+    return Long.MIN_VALUE;      
+  }
+  
+  /**
+   * Returns the longest path ending at endID
+   * 
+   * Path is in reversed order, starting with endID and going forwards.
+   * 
+   * @param endID
+   * @return
+   */
+  
+  public List<Report> longestPath(String endID) {
+    return longestPath(new RDistMetric(), endID);
+  }
+
+  public List<Report> longestPath(RDistMetric metric, String endID) {
+    //if we have the reports in topological order, this should be easy.
+    //Just take max of all predecessors seen until that point.
+    
+    //alternatively, could start at the end and walk backwards
+    ArrayList<Report> backpath = new ArrayList<Report>();
+    Report cur = reports.get(endID);
+    do {
+      backpath.add(cur);
+      
+      Report limitingPred = null;
+      long latestPrereq = Long.MIN_VALUE;
+      
+      for(String predID: cur.get("Edge")) {
+        Report pred = reports.get(predID);
+        long finishTime = metric.dist(pred, cur);
+        if( finishTime > latestPrereq) {
+          latestPrereq = finishTime;
+          limitingPred = pred;
+        }
+        cur = limitingPred;  
+      }
+    } while(cur != null && cur.get("Edge") != null);
+    
+    //should be able to just walk forward, keeping trac
+    return backpath;
+  }
+  
+  /**
+   * Expect path to be sorted backwards.
+   * @param path
+   * @return
+   */
+  public static long onHostTimes(List<Report> path) {
+    long time =0;   
+    for(int i =0; i < path.size()-1; ++i) {
+      Report src = path.get(i+1);
+      Report dest = path.get(i);
+      List<String> srcHost = src.get("Host"), destHost = dest.get("Host");
+      if(srcHost != null && srcHost.size() > 0 && destHost != null &&
destHost.size() > 0) {
+        if(srcHost.get(0).equals(destHost.get(0))){
+          long src_ts = getTS(src);
+          long dest_ts = getTS(dest);
+          
+          time += (dest_ts - src_ts);
+          System.out.println("adding segment of length " + (dest_ts - src_ts));
+        }
+      }
+    }
+    return time;
+      
+  }
+  
+  
+  ////  Glue to make CausalGraph look like a pseudocollection
+  public Iterator<Report> iterator() {
+    return reports.values().iterator();
+  }
+  
+  public int size() {
+    return reports.size();
+  }
+  
+  public Collection<Report> getReports() {
+    return reports.values();
+  }
+  
+  //////IO utils
+  
+  public void slurpTextFile(File f) throws IOException {
+    
+    BufferedReader br = new BufferedReader(new FileReader(f));
+    Report rep;
+    while((rep = getNextReportFromReader(br)) != null ) {
+      add(rep);
+    }
+  }
+  
+  private Report getNextReportFromReader(BufferedReader br) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    
+    String s;
+    while((s = br.readLine()) != null ) {
+      if(s.length() > 1) {
+        sb.append(s);
+        sb.append("\n");
+      } else    //stop on blank line, if it isn't the first one we see
+        if(sb.length() > 1)
+          break;
+    }
+    if(sb.length() < 1)
+      return null;
+    
+    return Report.createFromString(sb.toString());
+  }
+  
+}

Modified: hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java?rev=805891&r1=805890&r2=805891&view=diff
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
(original)
+++ hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
Wed Aug 19 17:15:49 2009
@@ -38,6 +38,7 @@
   
   
   ReportSource rs;
+  String rsName;
   Thread pump = new Thread(this);
   Thread reportSourceThread;
   BlockingQueue<String> q = new ArrayBlockingQueue<String>(1000);
@@ -102,13 +103,14 @@
   }
 
   @Override
-  public String getCurrentStatus() throws AdaptorException {
-    return type;
+  public String getCurrentStatus() {
+    return type +" "+ rsName;
   }
 
   @Override
   public String parseArgs(String params) {
     rs = getXtrReportSource(params);  
+    rsName = params;
     return params; //no optional params 
   }
 

Modified: hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java?rev=805891&r1=805890&r2=805891&view=diff
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
(original)
+++ hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
Wed Aug 19 17:15:49 2009
@@ -38,6 +38,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 import edu.berkeley.xtrace.reporting.Report;
 import edu.berkeley.xtrace.*;
@@ -52,13 +53,24 @@
  */
 public class XtrExtract extends Configured implements Tool {
   
+  
+  /**
+   * Hadoop docs say to do this if you pass an ArrayWritable to reduce.
+   */
+  public static class TextArrayWritable extends ArrayWritable {
+      public TextArrayWritable() { super(Text.class); } 
+
+    } 
+ 
+  
   public static final String OUTLINK_FIELD = "__xtr_outlinks";
+  static Logger log = Logger.getLogger(XtrExtract.class);
   
   /**
-   * with more than 10,000 reports, switch to on-disk sort, 
+   * with more than 50,000 reports in a single trace, switch to on-disk sort, 
    * instead of in-memory topological sort.
    */
-  static final int MAX_IN_MEMORY_REPORTS = 10* 1000;
+  static final int MAX_IN_MEMORY_REPORTS = 50* 1000;
   
 public static class MapClass extends Mapper <Object, Object, BytesWritable, Text> {
     
@@ -71,13 +83,22 @@
         Mapper<Object, Object,BytesWritable, Text>.Context context)
         throws IOException, InterruptedException 
     {
+      Counter unparseableReport = context.getCounter("app", "unparseable chunks");
+      
       Text t;
       BytesWritable bw;
       
       if(k instanceof ChukwaArchiveKey && v instanceof ChunkImpl) {
         ChunkImpl value = (ChunkImpl) v;
         Report xtrReport = Report.createFromString(new String(value.getData()));
-        bw = new BytesWritable(xtrReport.getMetadata().getTaskId().get());
+       
+        try {    //we do this to handle the case where not all input is x-trace
+          bw = new BytesWritable(xtrReport.getMetadata().getTaskId().get());
+        } catch(Exception e) {
+          unparseableReport.increment(1);
+          return;
+        }
+        
         //FIXME: can probably optimize the above lines by doing a search in the raw bytes
         t= new Text(value.getData());
       } else if(k instanceof ChukwaRecordKey && v instanceof ChukwaRecord){
@@ -87,7 +108,7 @@
         //FIXME: can probably optimize the above lines by doing a search in the raw bytes
         t= new Text(value.getValue(Record.bodyField));
       } else {
-        System.out.println("unexpected key/value types: "+ k.getClass().getCanonicalName()

+        log.error("unexpected key/value types: "+ k.getClass().getCanonicalName() 
             + " and " + v.getClass().getCanonicalName() );
         return;
       }
@@ -109,21 +130,24 @@
           Reducer<BytesWritable, Text,BytesWritable,ArrayWritable>.Context context)

           throws IOException, InterruptedException
     {
-      String taskIDString = new String(taskID.getBytes());
+      String taskIDString = IoUtil.bytesToString(taskID.getBytes());
       //in both cases, key is OpId string
       HashMap<String, Report> reports = new LinkedHashMap<String, Report>();
 
-      Counter reportCounter = context.getCounter("app", "reports");
+      Counter reportCounter = context.getCounter("app", "distinct reports");
       Counter edgeCounter = context.getCounter("app", "edges");
       Counter badEdgeCounter = context.getCounter("app", "reference to missing report");
-      int edgeCount = 0;
+      Counter dupCounter = context.getCounter("app", "duplicate report");
+
+      int edgeCount = 0, dups = 0, numReports = 0;
       
-      int numReports = 0;
       for(Text rep_text: values) {
         Report r = Report.createFromString(rep_text.toString());
         numReports++;
         
         if(numReports < MAX_IN_MEMORY_REPORTS) {
+          if(reports.containsKey(r.getMetadata().getOpIdString()))
+            dups++;
           reports.put(r.getMetadata().getOpIdString(), r);
         } else if(numReports == MAX_IN_MEMORY_REPORTS) {
           //bail out, prepare to do an external sort.
@@ -132,92 +156,68 @@
           ;
     //      do the external sort
       }
-
-      HashMap<String, Integer> counts = new HashMap<String, Integer>();
-      Queue<Report> zeroInlinkReports = new LinkedList<Report>();
-      reportCounter.increment(reports.size());
-      //FIXME: could usefully compare reports.size() with numReports;
-      //that would measure duplicate reports
       
-      //increment link counts for children
-      for(Report r: reports.values()){ 
-        String myOpID = r.getMetadata().getOpIdString();
-        int parentCount = 0;
-        for(String inLink: r.get("Edge")) {
-          
-            //sanitize data from old, nonconformant C++ implementation
-          if(inLink.contains(","))
-            inLink = inLink.substring(0, inLink.indexOf(','));
-          
-          Report parent = reports.get(inLink);
-          if(parent != null) {
-            parent.put(OUTLINK_FIELD, myOpID);
-            parentCount++;
-            edgeCount++;
-          }
-          else { //no match
-            if(!inLink.equals("0000000000000000"))  {
-              System.out.println("no sign of parent: " + inLink);
-              badEdgeCounter.increment(1);
-            }
-            //else quietly suppress
-          }
-        }
-          
-        //if there weren't any parents, we can dequeue
-        if(parentCount == 0)
-          zeroInlinkReports.add(r);
+      reportCounter.increment(reports.size());
+      dupCounter.increment(dups);
+      CausalGraph g = new CausalGraph(reports);
+
+      PtrReverse reverser = new PtrReverse();
+      List<Report> sortedReports = g.topoSort(reverser);
+      int sortedLen = sortedReports.size();
+      if(sortedLen!= reports.size()) {
+        if(sortedLen > 0)
+           log.warn(taskIDString+": I only sorted " + sortedLen + " items, but expected "

+            + reports.size()+", is your list cyclic?");
         else
-          counts.put(myOpID, parentCount);
+          log.warn(taskIDString+": every event in graph has a predecessor; perhaps "
+              + "the start event isn't in the input set?");
       }
+      log.debug(taskIDString+": " + reverser.edgeCount + " total edges");
+      edgeCounter.increment(reverser.edgeCount);
+      badEdgeCounter.increment(reverser.badCount);
       
-      System.out.println(taskIDString+": " + edgeCount + " total edges");
-      edgeCounter.increment(edgeCount);
-      //at this point, we have a map from metadata to report, and also
-      //from report op ID to inlink count.
-      //next step is to do a topological sort.
-
-      
-      Text[] finalOutput = new Text[reports.size()];
-      System.out.println(taskIDString+": expecting to sort " + finalOutput.length + " reports");
+      Text[] finalOutput = new Text[sortedReports.size()];
       int i=0;
-      while(!zeroInlinkReports.isEmpty()) {
-        Report r = zeroInlinkReports.poll();
-        if(r == null) {
-          System.err.println("poll returned null but list not empty");
-          break;
-        }
+      for(Report r:sortedReports)
         finalOutput[i++] = new Text(r.toString());
-        List<String> outLinks =  r.get(OUTLINK_FIELD);
-        if(outLinks != null) {
-          for(String outLink: outLinks) {
-            Integer oldCount = counts.get(outLink);
-            if(oldCount == null) {
-              oldCount = 0;  //FIXME: can this happen?
-              System.out.println("warning: found an in-edge where none was expected");
-            } if(oldCount == 1) {
-              zeroInlinkReports.add(reports.get(outLink));
-            }
-            counts.put(outLink, oldCount -1);
-          }
-        }
-      }
-      if(i != finalOutput.length) {
-        if(i > 0)
-           System.out.println("error: I only sorted " + i + " items, but expected " 
-            + finalOutput.length+", is your list cyclic?");
-        else
-          System.out.println("every event in graph has a predecessor; perhaps "
-              + "the start event isn't in the input set?");
-      }
 
-      context.write(taskID, new ArrayWritable(Text.class, finalOutput));
+      TextArrayWritable out = new TextArrayWritable();
+      out.set(finalOutput);
+      context.write(taskID, out);
       //Should sort values topologically and output list.  or?
       
     } //end reduce
     
   }//end reduce class
 
+  public static class PtrReverse {
+    int badCount = 0;
+    int edgeCount = 0;
+    
+    public int setupForwardPointers(Map<String, Report> reports, Report r,
+        String myOpID) {
+      int parentCount =0;
+      for(String inLink: r.get("Edge")) {  
+        //sanitize data from old, nonconformant C++ implementation
+        if(inLink.contains(","))
+          inLink = inLink.substring(0, inLink.indexOf(','));
+        
+        Report parent = reports.get(inLink);
+        if(parent != null) {
+          parent.put(OUTLINK_FIELD, myOpID);
+          parentCount++;
+        } else { //no match
+          if(!inLink.equals("0000000000000000"))  {
+            log.info("no sign of parent: " + inLink);
+            badCount++;
+          }
+          //else quietly suppress
+        }
+      }
+      edgeCount += badCount + parentCount;
+      return parentCount;
+    }
+  }
 
   @Override
   public int run(String[] arg) throws Exception {
@@ -234,7 +234,7 @@
     extractor.setMapOutputValueClass(Text.class);
     
     extractor.setOutputKeyClass(BytesWritable.class);
-    extractor.setOutputValueClass(ArrayWritable.class);
+    extractor.setOutputValueClass(TextArrayWritable.class);
     
     extractor.setInputFormatClass(SequenceFileInputFormat.class);
     extractor.setOutputFormatClass(SequenceFileOutputFormat.class);

Modified: hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java?rev=805891&r1=805890&r2=805891&view=diff
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
(original)
+++ hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
Wed Aug 19 17:15:49 2009
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.berkeley.chukwa_xtrace;
-
-import java.io.IOException;
-import java.util.Calendar;
-import junit.framework.TestCase;
-import org.apache.hadoop.chukwa.ChukwaArchiveKey;
-import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.util.ToolRunner;
-
-public class TestXtrExtract extends TestCase {
-  
-  public void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
-      int chunks) throws IOException {
-    FSDataOutputStream out = fileSys.create(dest);
-
-    Calendar calendar = Calendar.getInstance();
-    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
-        ChukwaArchiveKey.class, ChunkImpl.class,
-        SequenceFile.CompressionType.NONE, null);
-
-      //FIXME: do write here
-
-    seqFileWriter.close();
-    out.close();
-  }
-  
-  static final int NUM_HADOOP_SLAVES = 1;
-  static final Path OUTPUT_DIR = new Path("/test/out/");
-  static final Path INPUT_DIR = new Path("/test/in/");
-  
- public void testArchiving() throws Exception {
-    
-    System.out.println("starting archive test");
-    Configuration conf = new Configuration();
-    System.setProperty("hadoop.log.dir", System.getProperty(
-        "test.build.data", "/tmp"));
-    MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
-        null);
-    FileSystem fileSys = dfs.getFileSystem();
-    fileSys.delete(OUTPUT_DIR, true);//nuke output dir
-
-    writeASinkFile(conf, fileSys, INPUT_DIR, 1000);
-    
-    FileStatus fstat = fileSys.getFileStatus(INPUT_DIR);
-    assertTrue(fstat.getLen() > 10);
-    
-    System.out.println("filesystem is " + fileSys.getUri());
-    conf.set("fs.default.name", fileSys.getUri().toString());
-    conf.setInt("io.sort.mb", 1);
-    conf.setInt("io.sort.factor", 5);
-    conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
-    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
-    
-    MiniMRCluster mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
-        .toString(), 1);
-    String[] archiveArgs = {INPUT_DIR.toString(),
-        fileSys.getUri().toString() +OUTPUT_DIR.toString() };
-    
-    
-    JobConf jc = mr.createJobConf(new JobConf(conf));
-    assertEquals("true", jc.get("archive.groupByClusterName"));
-    assertEquals(1, jc.getInt("io.sort.mb", 5));
-    
-    int returnVal = ToolRunner.run(jc,  new XtrExtract(), archiveArgs);
-    assertEquals(0, returnVal);
-    fstat = fileSys.getFileStatus(new Path("/chukwa/archives/foocluster/HadoopLogProcessor_2008_05_29.arc"));
-    assertTrue(fstat.getLen() > 10);    
-    
-    Thread.sleep(1000);
-
-    System.out.println("done!");
- }  
-}



Mime
View raw message