hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r690142 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 29 Aug 2008 08:19:37 GMT
Author: ddas
Date: Fri Aug 29 01:19:36 2008
New Revision: 690142

URL: http://svn.apache.org/viewvc?rev=690142&view=rev
Log:
HADOOP-3828. Provides a way to write skipped records to DFS. Contributed by Sharad Agarwal.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 29 01:19:36 2008
@@ -108,6 +108,9 @@
 
     HADOOP-3754. Add a thrift interface to access HDFS. (dhruba via omalley)
 
+    HADOOP-3828. Provides a way to write skipped records to DFS.
+    (Sharad Agarwal via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
Fri Aug 29 01:19:36 2008
@@ -121,13 +121,12 @@
   public void testDisableSkip() throws Exception {
     JobConf clusterConf = createJobConf();
     createInput();
-    
+    int attSkip =0;
+    SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
     //the no of attempts to successfully complete the task depends 
     //on the no of bad records.
-    int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
-                          +1+MAPPER_BAD_RECORDS.size();
-    int reducerAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
-                           +1+REDUCER_BAD_RECORDS.size();
+    int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
+    int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
     String[] args =  new String[] {
       "-input", (new Path(getInputDir(), "text.txt")).toString(),
       "-output", getOutputDir().toString(),
@@ -135,6 +134,7 @@
       "-reducer", badReducer,
       "-verbose",
       "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+      "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
       "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
       "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
       "-jobconf", "mapred.skip.mode.enabled=false",
@@ -157,13 +157,12 @@
   public void testSkip() throws Exception {
     JobConf clusterConf = createJobConf();
     createInput();
-    
+    int attSkip =0;
+    SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
     //the no of attempts to successfully complete the task depends 
     //on the no of bad records.
-    int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
-                          +1+MAPPER_BAD_RECORDS.size();
-    int reducerAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
-                           +1+REDUCER_BAD_RECORDS.size();
+    int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
+    int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
     
     String[] args =  new String[] {
       "-input", (new Path(getInputDir(), "text.txt")).toString(),
@@ -172,6 +171,7 @@
       "-reducer", badReducer,
       "-verbose",
       "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+      "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
       "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
       "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
       "-jobconf", "mapred.map.tasks=1",

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Aug 29 01:19:36
2008
@@ -45,7 +45,9 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -53,6 +55,7 @@
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -143,19 +146,14 @@
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
     private Counters.Counter inputRecordCounter;
-    private Iterator<Long> skipFailedRecIndexIterator;
-    private TaskUmbilicalProtocol umbilical;
-    private long recIndex = -1;
     private long beforePos = -1;
     private long afterPos = -1;
     
-    TrackedRecordReader(RecordReader<K,V> raw, Counters counters, 
-        TaskUmbilicalProtocol umbilical) {
+    TrackedRecordReader(RecordReader<K,V> raw, Counters counters) 
+      throws IOException{
       rawIn = raw;
-      this.umbilical = umbilical;
       inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
       inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
-      skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
     }
 
     public K createKey() {
@@ -169,28 +167,22 @@
     public synchronized boolean next(K key, V value)
     throws IOException {
       boolean ret = moveToNext(key, value);
-      if(isSkipping() && ret) {
-        long nextRecIndex = skipFailedRecIndexIterator.next();
-        long skip = nextRecIndex - recIndex;
-        for(int i=0;i<skip && ret;i++) {
-          ret = moveToNext(key, value);
-        }
-        getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
-        reportNextRecordRange(umbilical, nextRecIndex);
-      }
       if (ret) {
-        inputRecordCounter.increment(1);
-        inputByteCounter.increment(afterPos - beforePos);
+        incrCounters();
       }
       return ret;
     }
+    
+    protected void incrCounters() {
+      inputRecordCounter.increment(1);
+      inputByteCounter.increment(afterPos - beforePos);
+    }
      
-    private synchronized boolean moveToNext(K key, V value)
+    protected synchronized boolean moveToNext(K key, V value)
       throws IOException {
       setProgress(getProgress());
       beforePos = getPos();
       boolean ret = rawIn.next(key, value);
-      recIndex++;
       afterPos = getPos();
       return ret;
     }
@@ -202,6 +194,68 @@
     }
   }
 
+  /**
+   * This class skips the records based on the failed ranges from previous 
+   * attempts.
+   */
+  class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
+    private SkipRangeIterator skipIt;
+    private SequenceFile.Writer skipWriter;
+    private TaskUmbilicalProtocol umbilical;
+    private Counters.Counter skipRecCounter;
+    private long recIndex = -1;
+    
+    SkippingRecordReader(RecordReader<K,V> raw, Counters counters, 
+        TaskUmbilicalProtocol umbilical) throws IOException{
+      super(raw,counters);
+      this.umbilical = umbilical;
+      this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
+      skipIt = getFailedRanges().skipRangeIterator();
+    }
+    
+    public synchronized boolean next(K key, V value)
+    throws IOException {
+      boolean ret = moveToNext(key, value);
+      long nextRecIndex = skipIt.next();
+      long skip = nextRecIndex - recIndex;
+      for(int i=0;i<skip && ret;i++) {
+      	writeSkippedRec(key, value);
+        ret = moveToNext(key, value);
+      }
+      //close the skip writer once all the ranges are skipped
+      if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
+        skipWriter.close();
+      }
+      skipRecCounter.increment(skip);
+      reportNextRecordRange(umbilical, nextRecIndex);
+      if (ret) {
+        incrCounters();
+      }
+      return ret;
+    }
+    
+    protected synchronized boolean moveToNext(K key, V value)
+    throws IOException {
+	    recIndex++;
+      return super.moveToNext(key, value);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void writeSkippedRec(K key, V value) throws IOException{
+      if(skipWriter==null) {
+        Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+        Path skipFile = new Path(skipDir, getTaskID().toString());
+        skipWriter = 
+          SequenceFile.createWriter(
+              skipFile.getFileSystem(conf), conf, skipFile,
+              (Class<K>) createKey().getClass(),
+              (Class<V>) createValue().getClass(), 
+              CompressionType.BLOCK, getReporter(umbilical));
+      }
+      skipWriter.append(key, value);
+    }
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
@@ -244,7 +298,9 @@
       
     RecordReader rawIn =                  // open input
       job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
-    RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
+    RecordReader in = isSkipping() ? 
+        new SkippingRecordReader(rawIn, getCounters(), umbilical) :
+        new TrackedRecordReader(rawIn, getCounters());
     job.setBoolean("mapred.skip.on", isSkipping());
 
     MapRunnable runner =

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Aug 29 01:19:36
2008
@@ -56,16 +56,20 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.IFile.*;
 import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapred.Task.Counter;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -232,6 +236,10 @@
     @Override
     public VALUE next() {
       reduceInputValueCounter.increment(1);
+      return moveToNext();
+    }
+    
+    protected VALUE moveToNext() {
       return super.next();
     }
     
@@ -243,9 +251,14 @@
 
   private class SkippingReduceValuesIterator<KEY,VALUE> 
      extends ReduceValuesIterator<KEY,VALUE> {
-     private Iterator<Long> skipFailedRecIndexIterator;
+     private SkipRangeIterator skipIt;
      private TaskUmbilicalProtocol umbilical;
+     private Counters.Counter skipGroupCounter;
+     private Counters.Counter skipRecCounter;
      private long recIndex = -1;
+     private Class<KEY> keyClass;
+     private Class<VALUE> valClass;
+     private SequenceFile.Writer skipWriter;
      
      public SkippingReduceValuesIterator(RawKeyValueIterator in,
          RawComparator<KEY> comparator, Class<KEY> keyClass,
@@ -253,7 +266,13 @@
          TaskUmbilicalProtocol umbilical) throws IOException {
        super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
-       skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
+       this.skipGroupCounter = 
+         getCounters().findCounter(Counter.REDUCE_SKIPPED_GROUPS);
+       this.skipRecCounter = 
+         getCounters().findCounter(Counter.REDUCE_SKIPPED_RECORDS);
+       this.keyClass = keyClass;
+       this.valClass = valClass;
+       skipIt = getFailedRanges().skipRangeIterator();
        mayBeSkip();
      }
      
@@ -264,15 +283,38 @@
      
      private void mayBeSkip() throws IOException {
        recIndex++;
-       long nextRecIndex = skipFailedRecIndexIterator.next();
+       long nextRecIndex = skipIt.next();
        long skip = nextRecIndex - recIndex;
+       long skipRec = 0;
        for(int i=0;i<skip && super.more();i++) {
+         while (hasNext()) {
+           writeSkippedRec(getKey(), moveToNext());
+           skipRec++;
+         }
          super.nextKey();
          recIndex++;
        }
-       getCounters().incrCounter(Counter.REDUCE_SKIPPED_RECORDS, skip);
+       //close the skip writer once all the ranges are skipped
+       if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
+         skipWriter.close();
+       }
+       skipGroupCounter.increment(skip);
+       skipRecCounter.increment(skipRec);
        reportNextRecordRange(umbilical, nextRecIndex);
      }
+     
+     @SuppressWarnings("unchecked")
+     private void writeSkippedRec(KEY key, VALUE value) throws IOException{
+       if(skipWriter==null) {
+         Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+         Path skipFile = new Path(skipDir, getTaskID().toString());
+         skipWriter = SequenceFile.createWriter(
+               skipFile.getFileSystem(conf), conf, skipFile,
+               keyClass, valClass, 
+               CompressionType.BLOCK, getReporter(umbilical));
+       }
+       skipWriter.append(key, value);
+     }
   }
 
   @Override

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java Fri Aug 29 01:19:36
2008
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Utility class for skip bad records functionality. It contains various 
@@ -33,6 +34,7 @@
     "mapred.skip.map.auto.incr.proc.count";
   private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
     "mapred.skip.reduce.auto.incr.proc.count";
+  private static final String OUT_PATH = "mapred.skip.out.dir";
   
   /**
    * Is skipping of bad records enabled. If it is enabled 
@@ -158,4 +160,31 @@
     conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
   }
   
+  /**
+   * Get the directory to which skipped records are written. By default it is 
+   * the sub directory of the output _logs directory.
+   * @param conf the configuration.
+   * @return path skip output directory. Null is returned if this is not set 
+   * and output directory is also not set.
+   */
+  public static Path getSkipOutputPath(Configuration conf) {
+    String name =  conf.get(OUT_PATH);
+    if(name!=null) {
+      return new Path(name);
+    }
+    Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
+    return outPath==null ? null : new Path(outPath, 
+        "_logs"+Path.SEPARATOR+"skip");
+  }
+  
+  /**
+   * Set the directory to which skipped records are written. By default it is 
+   * the sub directory of the output _logs directory.
+   * @param conf the configuration.
+   * @param path skip output directory path
+   */
+  public static void setSkipOutputPath(JobConf conf, Path path) {
+    conf.set(OUT_PATH, path.toString());
+  }
+  
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java Fri Aug 29 01:19:36
2008
@@ -35,28 +35,28 @@
  * Provides the SkipRangeIterator, which skips the Ranges 
  * stored in this object.
  */
-public class SortedRanges implements Writable{
+class SortedRanges implements Writable{
   
   private static final Log LOG = 
     LogFactory.getLog(SortedRanges.class);
   
-  private SortedSet<Range> ranges = new TreeSet<Range>();
-  private int indicesCount;
+  private TreeSet<Range> ranges = new TreeSet<Range>();
+  private long indicesCount;
   
   /**
    * Get Iterator which skips the stored ranges.
    * The Iterator.next() call return the index starting from 0.
-   * @return Iterator<Long>
+   * @return SkipRangeIterator
    */
-  public Iterator<Long> skipRangeIterator(){
-    return new SkipRangeIterator();
+  synchronized SkipRangeIterator skipRangeIterator(){
+    return new SkipRangeIterator(ranges.iterator());
   }
   
   /**
    * Get the no of indices stored in the ranges.
    * @return indices count
    */
-  public synchronized int getIndicesCount() {
+  synchronized long getIndicesCount() {
     return indicesCount;
   }
   
@@ -69,7 +69,7 @@
    * If the range is of 0 length, doesn't do anything.
    * @param range Range to be added.
    */
-  public synchronized void add(Range range){
+  synchronized void add(Range range){
     if(range.isEmpty()) {
       return;
     }
@@ -123,7 +123,7 @@
    * If range is of 0 length, doesn't do anything.
    * @param range Range to be removed.
    */
-  public synchronized void remove(Range range) {
+  synchronized void remove(Range range) {
     if(range.isEmpty()) {
       return;
     }
@@ -177,7 +177,8 @@
     }
   }
   
-  public void readFields(DataInput in) throws IOException {
+  public synchronized void readFields(DataInput in) throws IOException {
+    indicesCount = in.readLong();
     ranges = new TreeSet<Range>();
     int size = in.readInt();
     for(int i=0;i<size;i++) {
@@ -187,7 +188,8 @@
     }
   }
 
-  public void write(DataOutput out) throws IOException {
+  public synchronized void write(DataOutput out) throws IOException {
+    out.writeLong(indicesCount);
     out.writeInt(ranges.size());
     Iterator<Range> it = ranges.iterator();
     while(it.hasNext()) {
@@ -215,7 +217,7 @@
     private long startIndex;
     private long length;
         
-    public Range(long startIndex, long length) {
+    Range(long startIndex, long length) {
       if(length<0) {
         throw new RuntimeException("length can't be negative");
       }
@@ -223,7 +225,7 @@
       this.length = length;
     }
     
-    public Range() {
+    Range() {
       this(0,0);
     }
     
@@ -231,7 +233,7 @@
      * Get the start index. Start index in inclusive.
      * @return startIndex. 
      */
-    public long getStartIndex() {
+    long getStartIndex() {
       return startIndex;
     }
     
@@ -239,7 +241,7 @@
      * Get the end index. End index is exclusive.
      * @return endIndex.
      */
-    public long getEndIndex() {
+    long getEndIndex() {
       return startIndex + length;
     }
     
@@ -247,7 +249,7 @@
     * Get Length.
     * @return length
     */
-    public long getLength() {
+    long getLength() {
       return length;
     }
     
@@ -256,7 +258,7 @@
      * @return <code>true</code> if empty
      *         <code>false</code> otherwise.
      */
-    public boolean isEmpty() {
+    boolean isEmpty() {
       return length==0;
     }
     
@@ -299,17 +301,25 @@
   /**
    * Index Iterator which skips the stored ranges.
    */
-  private class SkipRangeIterator implements Iterator<Long> {
-    Iterator<Range> rangeIterator = ranges.iterator();
+  static class SkipRangeIterator implements Iterator<Long> {
+    Iterator<Range> rangeIterator;
     Range range = new Range();
     long currentIndex = -1;
     
     /**
+     * Constructor
+     * @param rangeIterator the iterator which gives the ranges.
+     */
+    SkipRangeIterator(Iterator<Range> rangeIterator) {
+      this.rangeIterator = rangeIterator;
+    }
+    
+    /**
      * Returns true till the index reaches Long.MAX_VALUE.
      * @return <code>true</code> next index exists.
      *         <code>false</code> otherwise.
      */
-    public boolean hasNext() {
+    public synchronized boolean hasNext() {
       return currentIndex<Long.MAX_VALUE;
     }
     
@@ -339,6 +349,15 @@
     }
     
     /**
+     * Get whether all the ranges have been skipped.
+     * @return <code>true</code> if all ranges have been skipped.
+     *         <code>false</code> otherwise.
+     */
+    synchronized boolean skippedAllRanges() {
+      return !rangeIterator.hasNext() && currentIndex>=range.getEndIndex();
+    }
+    
+    /**
      * Remove is not supported. Doesn't apply.
      */
     public void remove() {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Aug 29 01:19:36 2008
@@ -73,6 +73,7 @@
     REDUCE_INPUT_GROUPS,
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS,
+    REDUCE_SKIPPED_GROUPS,
     REDUCE_SKIPPED_RECORDS
   }
   

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Aug 29 01:19:36
2008
@@ -179,6 +179,7 @@
   void init(JobID jobId) {
     this.startTime = System.currentTimeMillis();
     this.id = new TaskID(jobId, isMapTask(), partition);
+    this.skipping = startSkipping();
   }
 
   ////////////////////////////////////
@@ -489,10 +490,7 @@
       machinesWhereFailed.add(trackerHostName);
       LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
       failedRanges.add(status.getNextRecordRange());
-      if(SkipBadRecords.getEnabled(conf) && 
-          numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
-        skipping = true;
-      }
+      skipping = startSkipping();
     } else {
       numKilledTasks++;
     }
@@ -502,6 +500,17 @@
       kill();
     }
   }
+  
+  /**
+   * Get whether to start skipping mode. 
+   */
+  private boolean startSkipping() {
+    if(SkipBadRecords.getEnabled(conf) && 
+        numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
+      return true;
+    }
+    return false;
+  }
 
   /**
    * Finalize the <b>completed</b> task; note that this might not be the first


Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Fri Aug
29 01:19:36 2008
@@ -13,5 +13,6 @@
 REDUCE_INPUT_RECORDS.name=     Reduce input records
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
+REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
 
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java Fri Aug 29 01:19:36
2008
@@ -35,7 +35,9 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class TestBadRecords extends ClusterMapReduceTestCase {
   
@@ -71,6 +73,7 @@
     conf.setNumReduceTasks(1);
     conf.setInt("mapred.task.timeout", 30*1000);
     
+    SkipBadRecords.setAttemptsToStartSkipping(conf,0);
     //the no of attempts to successfully complete the task depends 
     //on the no of bad records.
     conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
@@ -105,6 +108,59 @@
     throws Exception{
     LOG.info(runningJob.getCounters().toString());
     assertTrue(runningJob.isSuccessful());
+    
+    //validate counters
+    Counters counters = runningJob.getCounters();
+    assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS).
+        getCounter(),mapperBadRecords.size());
+    
+    int mapRecs = input.size() - mapperBadRecords.size();
+    assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).
+        getCounter(),mapRecs);
+    assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).
+        getCounter(),mapRecs);
+    
+    int redRecs = mapRecs - redBadRecords.size();
+    assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS).
+        getCounter(),redBadRecords.size());
+    assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS).
+        getCounter(),redBadRecords.size());
+    assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS).
+        getCounter(),redRecs);
+    assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS).
+        getCounter(),redRecs);
+    assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).
+        getCounter(),redRecs);
+    
+    //validate skipped records
+    Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+    Path[] skips = FileUtil.stat2Paths(getFileSystem().listStatus(skipDir));
+    List<String> mapSkipped = new ArrayList<String>();
+    List<String> redSkipped = new ArrayList<String>();
+    for(Path skipPath : skips) {
+      LOG.info("skipPath: " + skipPath);
+      
+      SequenceFile.Reader reader = new SequenceFile.Reader(
+          getFileSystem(), skipPath, conf);
+      Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+      Object value = ReflectionUtils.newInstance(reader.getValueClass(), 
+          conf);
+      key = reader.next(key);
+      while(key!=null) {
+        value = reader.getCurrentValue(value);
+        LOG.debug("key:"+key+" value:"+value.toString());
+        if(skipPath.getName().contains("_r_")) {
+          redSkipped.add(value.toString());
+        } else {
+          mapSkipped.add(value.toString());
+        }
+        key = reader.next(key);
+      }
+      reader.close();
+    }
+    assertTrue(mapSkipped.containsAll(mapperBadRecords));
+    assertTrue(redSkipped.containsAll(redBadRecords));
+    
     Path[] outputFiles = FileUtil.stat2Paths(
         getFileSystem().listStatus(getOutputDir(),
         new OutputLogFilter()));



Mime
View raw message