hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r696640 - in /hadoop/core/trunk: ./ conf/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/
Date Thu, 18 Sep 2008 11:48:00 GMT
Author: ddas
Date: Thu Sep 18 04:47:59 2008
New Revision: 696640

URL: http://svn.apache.org/viewvc?rev=696640&view=rev
Log:
HADOOP-3829. Narrown down skipped records based on user acceptable value. Contributed by Sharad Agarwal.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Sep 18 04:47:59 2008
@@ -172,6 +172,9 @@
     and change DistCp to use file checksum for comparing src and dst files
     (szetszwo)
 
+    HADOOP-3829. Narrown down skipped records based on user acceptable value.
+    (Sharad Agarwal via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Sep 18 04:47:59 2008
@@ -1203,15 +1203,6 @@
   </property>
   
   <property>
-    <name>mapred.skip.mode.enabled</name>
-    <value>false</value>
-    <description> Indicates whether skipping of bad records is enabled or not.
-    If enabled the framework will try to find bad records and skip  
-    them on further attempts.
-    </description>
-  </property>
-  
-  <property>
     <name>mapred.skip.attempts.to.start.skipping</name>
     <value>2</value>
     <description> The number of Task attempts AFTER which skip mode 
@@ -1227,7 +1218,7 @@
     <name>mapred.skip.map.auto.incr.proc.count</name>
     <value>true</value>
     <description> The flag which if set to true, 
-    Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+    SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS is incremented 
     by MapRunner after invoking the map function. This value must be set to 
     false for applications which process the records asynchronously 
     or buffer the input records. For example streaming. 
@@ -1239,13 +1230,52 @@
     <name>mapred.skip.reduce.auto.incr.proc.count</name>
     <value>true</value>
     <description> The flag which if set to true, 
-    Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+    SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS is incremented 
     by framework after invoking the reduce function. This value must be set to 
     false for applications which process the records asynchronously 
     or buffer the input records. For example streaming. 
     In such cases applications should increment this counter on their own.
     </description>
   </property>
+  
+  <property>
+    <name>mapred.skip.out.dir</name>
+    <value></value>
+    <description> If no value is specified here, the skipped records are 
+    written to the output directory at _logs/skip.
+    User can stop writing skipped records by giving the value "none". 
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.skip.map.max.skip.records</name>
+    <value>0</value>
+    <description> The number of acceptable skip records surrounding the bad 
+    record PER bad record in mapper. The number includes the bad record as well.
+    To turn the feature of detection/skipping of bad records off, set the 
+    value to 0.
+    The framework tries to narrow down the skipped range by retrying  
+    until this threshold is met OR all attempts get exhausted for this task. 
+    Set the value to Long.MAX_VALUE to indicate that framework need not try to 
+    narrow down. Whatever records(depends on application) get skipped are 
+    acceptable.
+    </description>
+  </property>
+  
+  <property>
+    <name>mapred.skip.reduce.max.skip.groups</name>
+    <value>0</value>
+    <description> The number of acceptable skip groups surrounding the bad 
+    group PER bad group in reducer. The number includes the bad group as well.
+    To turn the feature of detection/skipping of bad groups off, set the 
+    value to 0.
+    The framework tries to narrow down the skipped range by retrying  
+    until this threshold is met OR all attempts get exhausted for this task. 
+    Set the value to Long.MAX_VALUE to indicate that framework need not try to 
+    narrow down. Whatever groups(depends on application) get skipped are 
+    acceptable.
+    </description>
+  </property>
 
 <!-- ipc properties -->
 

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java Thu Sep 18 04:47:59 2008
@@ -51,7 +51,7 @@
     Arrays.asList("hey022","hey023","hey099");
   
   private static final List<String> REDUCER_BAD_RECORDS = 
-    Arrays.asList("hey001","hey024");
+    Arrays.asList("hey001","hey018");
   
   private static final String badMapper = 
     StreamUtil.makeJavaCommand(BadApp.class, new String[]{});
@@ -86,6 +86,33 @@
     throws Exception {
     LOG.info(runningJob.getCounters().toString());
     assertTrue(runningJob.isSuccessful());
+    
+    if(validateCount) {
+     //validate counters
+      String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
+      Counters counters = runningJob.getCounters();
+      assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
+          getCounter(),MAPPER_BAD_RECORDS.size());
+      
+      int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
+      assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
+          getCounter(),mapRecs);
+      assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
+          getCounter(),mapRecs);
+      
+      int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
+      assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
+          getCounter(),REDUCER_BAD_RECORDS.size());
+      assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
+          getCounter(),REDUCER_BAD_RECORDS.size());
+      assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
+          getCounter(),redRecs);
+      assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
+          getCounter(),redRecs);
+      assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
+          getCounter(),redRecs);
+    }
+    
     List<String> badRecs = new ArrayList<String>();
     badRecs.addAll(MAPPER_BAD_RECORDS);
     badRecs.addAll(REDUCER_BAD_RECORDS);
@@ -118,7 +145,7 @@
     }
   }
 
-  public void testDisableSkip() throws Exception {
+  public void testSkip() throws Exception {
     JobConf clusterConf = createJobConf();
     createInput();
     int attSkip =0;
@@ -127,6 +154,7 @@
     //on the no of bad records.
     int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
     int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
+    
     String[] args =  new String[] {
       "-input", (new Path(getInputDir(), "text.txt")).toString(),
       "-output", getOutputDir().toString(),
@@ -135,9 +163,11 @@
       "-verbose",
       "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
       "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
+      "-jobconf", "mapred.skip.out.dir=none",
       "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
       "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
-      "-jobconf", "mapred.skip.mode.enabled=false",
+      "-jobconf", "mapred.skip.map.max.skip.records="+Long.MAX_VALUE,
+      "-jobconf", "mapred.skip.reduce.max.skip.groups="+Long.MAX_VALUE,
       "-jobconf", "mapred.map.tasks=1",
       "-jobconf", "mapred.reduce.tasks=1",
       "-jobconf", "mapred.task.timeout=30000",
@@ -151,19 +181,14 @@
     };
     StreamJob job = new StreamJob(args, false);      
     job.go();
-    assertFalse(job.running_.isSuccessful());
+    validateOutput(job.running_, false);
+    //validate that there is no skip directory as it has been set to "none"
+    assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
   }
   
-  public void testSkip() throws Exception {
-    JobConf clusterConf = createJobConf();
+  public void testNarrowDown() throws Exception {
     createInput();
-    int attSkip =0;
-    SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
-    //the no of attempts to successfully complete the task depends 
-    //on the no of bad records.
-    int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
-    int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
-    
+    JobConf clusterConf = createJobConf();
     String[] args =  new String[] {
       "-input", (new Path(getInputDir(), "text.txt")).toString(),
       "-output", getOutputDir().toString(),
@@ -171,10 +196,11 @@
       "-reducer", badReducer,
       "-verbose",
       "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
-      "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
-      "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
-      "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
-      "-jobconf", "mapred.skip.mode.enabled=true",
+      "-jobconf", "mapred.skip.attempts.to.start.skipping=1",
+      "-jobconf", "mapred.map.max.attempts=12",
+      "-jobconf", "mapred.reduce.max.attempts=8",
+      "-jobconf", "mapred.skip.map.max.skip.records=1",
+      "-jobconf", "mapred.skip.reduce.max.skip.groups=1",
       "-jobconf", "mapred.map.tasks=1",
       "-jobconf", "mapred.reduce.tasks=1",
       "-jobconf", "mapred.task.timeout=30000",
@@ -188,7 +214,9 @@
     };
     StreamJob job = new StreamJob(args, false);      
     job.go();
-    validateOutput(job.running_, false);
+    
+    validateOutput(job.running_, true);
+    assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
   }
   
   static class App{
@@ -198,9 +226,9 @@
       if(args.length>0) {
         isReducer = Boolean.parseBoolean(args[0]);
       }
-      String counter = Counters.Application.MAP_PROCESSED_RECORDS;
+      String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
       if(isReducer) {
-        counter = Counters.Application.REDUCE_PROCESSED_RECORDS;
+        counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
       }
       BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
       String line;
@@ -209,8 +237,8 @@
         processLine(line);
         count++;
         if(count>=10) {
-          System.err.println("reporter:counter:"+Counters.Application.GROUP+","+
-              counter+","+count);
+          System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
+              ","+counter+","+count);
           count = 0;
         }
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Thu Sep 18 04:47:59 2008
@@ -747,14 +747,4 @@
     }
     return isEqual;
   }
-  
-  public static class Application {
-    //special counters which are written by the application and are 
-    //used by the framework.
-    public static final String GROUP = "ApplicationCounters";
-    public static final String MAP_PROCESSED_RECORDS = "MapProcessedRecords";
-    public static final String REDUCE_PROCESSED_RECORDS = 
-      "ReduceProcessedRecords";
-    
-  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapRunner.java Thu Sep 18 04:47:59 2008
@@ -32,7 +32,8 @@
   @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
     this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
-    this.incrProcCount = job.getBoolean("mapred.skip.on", false) && 
+    //increment processed counter only if skipping feature is enabled
+    this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
       SkipBadRecords.getAutoIncrMapperProcCount(job);
   }
 
@@ -48,8 +49,8 @@
         // map pair to output
         mapper.map(key, value, output, reporter);
         if(incrProcCount) {
-          reporter.incrCounter(Counters.Application.GROUP, 
-              Counters.Application.MAP_PROCESSED_RECORDS, 1);
+          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
+              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
         }
       }
     } finally {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu Sep 18 04:47:59 2008
@@ -201,6 +201,7 @@
   class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
     private SkipRangeIterator skipIt;
     private SequenceFile.Writer skipWriter;
+    private boolean toWriteSkipRecs;
     private TaskUmbilicalProtocol umbilical;
     private Counters.Counter skipRecCounter;
     private long recIndex = -1;
@@ -210,24 +211,33 @@
       super(raw,counters);
       this.umbilical = umbilical;
       this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
-      skipIt = getFailedRanges().skipRangeIterator();
+      this.toWriteSkipRecs = toWriteSkipRecs() &&  
+        SkipBadRecords.getSkipOutputPath(conf)!=null;
+      skipIt = getSkipRanges().skipRangeIterator();
     }
     
     public synchronized boolean next(K key, V value)
     throws IOException {
+      if(!skipIt.hasNext()) {
+        LOG.warn("Further records got skipped.");
+        return false;
+      }
       boolean ret = moveToNext(key, value);
       long nextRecIndex = skipIt.next();
-      long skip = nextRecIndex - recIndex;
-      for(int i=0;i<skip && ret;i++) {
-      	writeSkippedRec(key, value);
-        ret = moveToNext(key, value);
+      long skip = 0;
+      while(recIndex<nextRecIndex && ret) {
+        if(toWriteSkipRecs) {
+          writeSkippedRec(key, value);
+        }
+      	ret = moveToNext(key, value);
+        skip++;
       }
       //close the skip writer once all the ranges are skipped
       if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
         skipWriter.close();
       }
       skipRecCounter.increment(skip);
-      reportNextRecordRange(umbilical, nextRecIndex);
+      reportNextRecordRange(umbilical, recIndex);
       if (ret) {
         incrCounters();
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Sep 18 04:47:59 2008
@@ -255,10 +255,12 @@
      private TaskUmbilicalProtocol umbilical;
      private Counters.Counter skipGroupCounter;
      private Counters.Counter skipRecCounter;
-     private long recIndex = -1;
+     private long grpIndex = -1;
      private Class<KEY> keyClass;
      private Class<VALUE> valClass;
      private SequenceFile.Writer skipWriter;
+     private boolean toWriteSkipRecs;
+     private boolean hasNext;
      
      public SkippingReduceValuesIterator(RawKeyValueIterator in,
          RawComparator<KEY> comparator, Class<KEY> keyClass,
@@ -270,9 +272,11 @@
          getCounters().findCounter(Counter.REDUCE_SKIPPED_GROUPS);
        this.skipRecCounter = 
          getCounters().findCounter(Counter.REDUCE_SKIPPED_RECORDS);
+       this.toWriteSkipRecs = toWriteSkipRecs() &&  
+         SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;
        this.valClass = valClass;
-       skipIt = getFailedRanges().skipRangeIterator();
+       skipIt = getSkipRanges().skipRangeIterator();
        mayBeSkip();
      }
      
@@ -281,26 +285,40 @@
        mayBeSkip();
      }
      
+     boolean more() { 
+       return super.more() && hasNext; 
+     }
+     
      private void mayBeSkip() throws IOException {
-       recIndex++;
-       long nextRecIndex = skipIt.next();
-       long skip = nextRecIndex - recIndex;
+       hasNext = skipIt.hasNext();
+       if(!hasNext) {
+         LOG.warn("Further groups got skipped.");
+         return;
+       }
+       grpIndex++;
+       long nextGrpIndex = skipIt.next();
+       long skip = 0;
        long skipRec = 0;
-       for(int i=0;i<skip && super.more();i++) {
+       while(grpIndex<nextGrpIndex && super.more()) {
          while (hasNext()) {
-           writeSkippedRec(getKey(), moveToNext());
+           VALUE value = moveToNext();
+           if(toWriteSkipRecs) {
+             writeSkippedRec(getKey(), value);
+           }
            skipRec++;
          }
          super.nextKey();
-         recIndex++;
+         grpIndex++;
+         skip++;
        }
+       
        //close the skip writer once all the ranges are skipped
        if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
          skipWriter.close();
        }
        skipGroupCounter.increment(skip);
        skipRecCounter.increment(skipRec);
-       reportNextRecordRange(umbilical, nextRecIndex);
+       reportNextRecordRange(umbilical, grpIndex);
      }
      
      @SuppressWarnings("unchecked")
@@ -390,7 +408,8 @@
     try {
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
-      boolean incrProcCount = isSkipping() &&
+      //increment processed counter only if skipping feature is enabled
+      boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
         SkipBadRecords.getAutoIncrReducerProcCount(job);
       
       ReduceValuesIterator values = isSkipping() ? 
@@ -405,8 +424,8 @@
         reduceInputKeyCounter.increment(1);
         reducer.reduce(values.getKey(), values, collector, reporter);
         if(incrProcCount) {
-          reporter.incrCounter(Counters.Application.GROUP, 
-              Counters.Application.REDUCE_PROCESSED_RECORDS, 1);
+          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
+              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
         }
         values.nextKey();
         values.informReduceProgress();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java Thu Sep 18 04:47:59 2008
@@ -24,10 +24,53 @@
 /**
  * Utility class for skip bad records functionality. It contains various 
  * settings related to skipping of bad records.
+ * 
+ * <p>Hadoop provides an optional mode of execution in which the bad records
+ * are detected and skipped in further attempts.
+ * 
+ * <p>This feature can be used when map/reduce tasks crashes deterministically on 
+ * certain input. This happens due to bugs in the map/reduce function. The usual
+ * course would be to fix these bugs. But sometimes this is not possible; 
+ * perhaps the bug is in third party libraries for which the source code is 
+ * not available. Due to this, the task never reaches to completion even with 
+ * multiple attempts and complete data for that task is lost.</p>
+ *  
+ * <p>With this feature, only a small portion of data is lost surrounding 
+ * the bad record, which may be acceptable for some user applications.
+ * see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}</p>
+ * 
+ * <p>The skipping mode gets kicked off after certain no of failures 
+ * see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}</p>
+ *  
+ * <p>In the skipping mode, the map/reduce task maintains the record range which 
+ * is getting processed at all times. Before giving the input to the
+ * map/reduce function, it sends this record range to the Task tracker.
+ * If task crashes, the Task tracker knows which one was the last reported
+ * range. On further attempts that range get skipped.</p>
  */
 public class SkipBadRecords {
   
-  private static final String ENABLED = "mapred.skip.mode.enabled";
+  /**
+   * Special counters which are written by the application and are 
+   * used by the framework for detecting bad records. For detecting bad records 
+   * these counters must be incremented by the application.
+   */
+  public static final String COUNTER_GROUP = "SkippingTaskCounters";
+  
+  /**
+   * Number of processed map records.
+   * @see SkipBadRecords#getAutoIncrMapperProcCount(Configuration)
+   */
+  public static final String COUNTER_MAP_PROCESSED_RECORDS = 
+    "MapProcessedRecords";
+  
+  /**
+   * Number of processed reduce groups.
+   * @see SkipBadRecords#getAutoIncrReducerProcCount(Configuration)
+   */
+  public static final String COUNTER_REDUCE_PROCESSED_GROUPS = 
+    "ReduceProcessedGroups";
+  
   private static final String ATTEMPTS_TO_START_SKIPPING = 
     "mapred.skip.attempts.to.start.skipping";
   private static final String AUTO_INCR_MAP_PROC_COUNT = 
@@ -35,39 +78,19 @@
   private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
     "mapred.skip.reduce.auto.incr.proc.count";
   private static final String OUT_PATH = "mapred.skip.out.dir";
+  private static final String MAPPER_MAX_SKIP_RECORDS = 
+    "mapred.skip.map.max.skip.records";
+  private static final String REDUCER_MAX_SKIP_GROUPS = 
+    "mapred.skip.reduce.max.skip.groups";
   
   /**
-   * Is skipping of bad records enabled. If it is enabled 
-   * the framework will try to find bad records and skip  
-   * them on further attempts.
-   * 
-   * @param conf the configuration
-   * @return <code>true</code> if skipping is enabled
-   *         <code>false</code> otherwise.
-   */
-  public static boolean getEnabled(Configuration conf) {
-    return conf.getBoolean(ENABLED, false);
-  }
-  
-  /**
-   * Set whether to enable skipping of bad records. If it is enabled 
-   * the framework will try to find bad records and will 
-   * try to skip them on further attempts.
-   * 
-   * @param conf the configuration
-   * @param enabled boolean to enable/disable skipping 
-   */
-  public static void setEnabled(Configuration conf, boolean enabled) {
-    conf.setBoolean(ENABLED, enabled);
-  }
-
-  /**
    * Get the number of Task attempts AFTER which skip mode 
    * will be kicked off. When skip mode is kicked off, the 
    * tasks reports the range of records which it will process 
    * next to the TaskTracker. So that on failures, TT knows which 
    * ones are possibly the bad records. On further executions, 
    * those are skipped.
+   * Default value is 2.
    * 
    * @param conf the configuration
    * @return attemptsToStartSkipping no of task attempts
@@ -83,6 +106,7 @@
    * next to the TaskTracker. So that on failures, TT knows which 
    * ones are possibly the bad records. On further executions, 
    * those are skipped.
+   * Default value is 2.
    * 
    * @param conf the configuration
    * @param attemptsToStartSkipping no of task attempts
@@ -94,15 +118,16 @@
 
   /**
    * Get the flag which if set to true, 
-   * Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+   * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
    * by MapRunner after invoking the map function. This value must be set to 
    * false for applications which process the records asynchronously 
    * or buffer the input records. For example streaming. 
    * In such cases applications should increment this counter on their own.
+   * Default value is true.
    * 
    * @param conf the configuration
    * @return <code>true</code> if auto increment 
-   *                           Counters.Application.MAP_PROCESSED_RECORDS.
+   *                       {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
    *         <code>false</code> otherwise.
    */
   public static boolean getAutoIncrMapperProcCount(Configuration conf) {
@@ -111,15 +136,16 @@
   
   /**
    * Set the flag which if set to true, 
-   * Counters.Application.MAP_PROCESSED_RECORDS is incremented 
+   * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
    * by MapRunner after invoking the map function. This value must be set to 
    * false for applications which process the records asynchronously 
    * or buffer the input records. For example streaming. 
    * In such cases applications should increment this counter on their own.
+   * Default value is true.
    * 
    * @param conf the configuration
    * @param autoIncr whether to auto increment 
-   *        Counters.Application.MAP_PROCESSED_RECORDS.
+   *        {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
    */
   public static void setAutoIncrMapperProcCount(Configuration conf, 
       boolean autoIncr) {
@@ -128,15 +154,16 @@
   
   /**
    * Get the flag which if set to true, 
-   * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+   * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
    * by framework after invoking the reduce function. This value must be set to 
    * false for applications which process the records asynchronously 
    * or buffer the input records. For example streaming. 
    * In such cases applications should increment this counter on their own.
+   * Default value is true.
    * 
    * @param conf the configuration
    * @return <code>true</code> if auto increment 
-   *                           Counters.Application.REDUCE_PROCESSED_RECORDS.
+   *                    {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
    *         <code>false</code> otherwise.
    */
   public static boolean getAutoIncrReducerProcCount(Configuration conf) {
@@ -145,15 +172,16 @@
   
   /**
    * Set the flag which if set to true, 
-   * Counters.Application.REDUCE_PROCESSED_RECORDS is incremented 
+   * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
    * by framework after invoking the reduce function. This value must be set to 
    * false for applications which process the records asynchronously 
    * or buffer the input records. For example streaming. 
    * In such cases applications should increment this counter on their own.
+   * Default value is true.
    * 
    * @param conf the configuration
    * @param autoIncr whether to auto increment 
-   *        Counters.Application.REDUCE_PROCESSED_RECORDS.
+   *        {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
    */
   public static void setAutoIncrReducerProcCount(Configuration conf, 
       boolean autoIncr) {
@@ -163,6 +191,8 @@
   /**
    * Get the directory to which skipped records are written. By default it is 
    * the sub directory of the output _logs directory.
+   * User can stop writing skipped records by setting the value null.
+   * 
    * @param conf the configuration.
    * @return path skip output directory. Null is returned if this is not set 
    * and output directory is also not set.
@@ -170,6 +200,9 @@
   public static Path getSkipOutputPath(Configuration conf) {
     String name =  conf.get(OUT_PATH);
     if(name!=null) {
+      if("none".equals(name)) {
+        return null;
+      }
       return new Path(name);
     }
     Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
@@ -180,11 +213,96 @@
   /**
    * Set the directory to which skipped records are written. By default it is 
    * the sub directory of the output _logs directory.
+   * User can stop writing skipped records by setting the value null.
+   * 
    * @param conf the configuration.
    * @param path skip output directory path
    */
   public static void setSkipOutputPath(JobConf conf, Path path) {
-    conf.set(OUT_PATH, path.toString());
+    String pathStr = null;
+    if(path==null) {
+      pathStr = "none";
+    } else {
+      pathStr = path.toString();
+    }
+    conf.set(OUT_PATH, pathStr);
+  }
+  
+  /**
+   * Get the number of acceptable skip records surrounding the bad record PER 
+   * bad record in mapper. The number includes the bad record as well.
+   * To turn the feature of detection/skipping of bad records off, set the 
+   * value to 0.
+   * The framework tries to narrow down the skipped range by retrying  
+   * until this threshold is met OR all attempts get exhausted for this task. 
+   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
+   * narrow down. Whatever records(depends on application) get skipped are 
+   * acceptable.
+   * Default value is 0.
+   * 
+   * @param conf the configuration
+   * @return maxSkipRecs acceptable skip records.
+   */
+  public static long getMapperMaxSkipRecords(Configuration conf) {
+    return conf.getLong(MAPPER_MAX_SKIP_RECORDS, 0);
+  }
+  
+  /**
+   * Set the number of acceptable skip records surrounding the bad record PER 
+   * bad record in mapper. The number includes the bad record as well.
+   * To turn the feature of detection/skipping of bad records off, set the 
+   * value to 0.
+   * The framework tries to narrow down the skipped range by retrying  
+   * until this threshold is met OR all attempts get exhausted for this task. 
+   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
+   * narrow down. Whatever records(depends on application) get skipped are 
+   * acceptable.
+   * Default value is 0.
+   * 
+   * @param conf the configuration
+   * @param maxSkipRecs acceptable skip records.
+   */
+  public static void setMapperMaxSkipRecords(Configuration conf, 
+      long maxSkipRecs) {
+    conf.setLong(MAPPER_MAX_SKIP_RECORDS, maxSkipRecs);
+  }
+  
+  /**
+   * Get the number of acceptable skip groups surrounding the bad group PER 
+   * bad group in reducer. The number includes the bad group as well.
+   * To turn the feature of detection/skipping of bad groups off, set the 
+   * value to 0.
+   * The framework tries to narrow down the skipped range by retrying  
+   * until this threshold is met OR all attempts get exhausted for this task. 
+   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
+   * narrow down. Whatever groups(depends on application) get skipped are 
+   * acceptable.
+   * Default value is 0.
+   * 
+   * @param conf the configuration
+   * @return maxSkipGrps acceptable skip groups.
+   */
+  public static long getReducerMaxSkipGroups(Configuration conf) {
+    return conf.getLong(REDUCER_MAX_SKIP_GROUPS, 0);
   }
   
+  /**
+   * Set the number of acceptable skip groups surrounding the bad group PER 
+   * bad group in reducer. The number includes the bad group as well.
+   * To turn the feature of detection/skipping of bad groups off, set the 
+   * value to 0.
+   * The framework tries to narrow down the skipped range by retrying  
+   * until this threshold is met OR all attempts get exhausted for this task. 
+   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
+   * narrow down. Whatever groups(depends on application) get skipped are 
+   * acceptable.
+   * Default value is 0.
+   * 
+   * @param conf the configuration
+   * @param maxSkipGrps acceptable skip groups.
+   */
+  public static void setReducerMaxSkipGroups(Configuration conf, 
+      long maxSkipGrps) {
+    conf.setLong(REDUCER_MAX_SKIP_GROUPS, maxSkipGrps);
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java Thu Sep 18 04:47:59 2008
@@ -61,6 +61,14 @@
   }
   
   /**
+   * Get the sorted set of ranges.
+   * @return ranges
+   */
+  synchronized SortedSet<Range> getRanges() {
+  	return ranges;
+ 	}
+  
+  /**
    * Add the range indices. It is ensured that the added range 
    * doesn't overlap the existing ranges. If it overlaps, the 
    * existing overlapping ranges are removed and a single range 
@@ -304,7 +312,7 @@
   static class SkipRangeIterator implements Iterator<Long> {
     Iterator<Range> rangeIterator;
     Range range = new Range();
-    long currentIndex = -1;
+    long next = -1;
     
     /**
      * Constructor
@@ -312,6 +320,7 @@
      */
     SkipRangeIterator(Iterator<Range> rangeIterator) {
       this.rangeIterator = rangeIterator;
+      doNext();
     }
     
     /**
@@ -320,7 +329,7 @@
      *         <code>false</code> otherwise.
      */
     public synchronized boolean hasNext() {
-      return currentIndex<Long.MAX_VALUE;
+      return next<Long.MAX_VALUE;
     }
     
     /**
@@ -328,22 +337,27 @@
      * @return next index
      */
     public synchronized Long next() {
-      currentIndex++;
-      LOG.debug("currentIndex "+currentIndex +"   "+range);
+      long ci = next;
+      doNext();
+      return ci;
+    }
+    
+    private void doNext() {
+      next++;
+      LOG.debug("currentIndex "+next +"   "+range);
       skipIfInRange();
-      while(currentIndex>=range.getEndIndex() && rangeIterator.hasNext()) {
+      while(next>=range.getEndIndex() && rangeIterator.hasNext()) {
         range = rangeIterator.next();
         skipIfInRange();
       }
-      return currentIndex;
     }
     
     private void skipIfInRange() {
-      if(currentIndex>=range.getStartIndex() && 
-          currentIndex<range.getEndIndex()) {
+      if(next>=range.getStartIndex() && 
+          next<range.getEndIndex()) {
         //need to skip the range
-        LOG.warn("Skipping index " + currentIndex +"-" + range.getEndIndex());
-        currentIndex = range.getEndIndex();
+        LOG.warn("Skipping index " + next +"-" + range.getEndIndex());
+        next = range.getEndIndex();
         
       }
     }
@@ -354,7 +368,7 @@
      *         <code>false</code> otherwise.
      */
     synchronized boolean skippedAllRanges() {
-      return !rangeIterator.hasNext() && currentIndex>=range.getEndIndex();
+      return !rangeIterator.hasNext() && next>range.getEndIndex();
     }
     
     /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Thu Sep 18 04:47:59 2008
@@ -109,14 +109,15 @@
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
   
-  //failed ranges from previous attempts
-  private SortedRanges failedRanges = new SortedRanges();
+  //skip ranges based on failed ranges from previous attempts
+  private SortedRanges skipRanges = new SortedRanges();
   private boolean skipping = false;
+  private boolean writeSkipRecs = true;
   
   //currently processing record start index
   private volatile long currentRecStartIndex; 
   private Iterator<Long> currentRecIndexIterator = 
-    failedRanges.skipRangeIterator();
+    skipRanges.skipRangeIterator();
   
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
@@ -190,19 +191,31 @@
   }
   
   /**
-   * Get failed ranges.
-   * @return recordsToSkip
+   * Get whether to write skip records.
    */
-  public SortedRanges getFailedRanges() {
-    return failedRanges;
+  protected boolean toWriteSkipRecs() {
+    return writeSkipRecs;
+  }
+      
+  /**
+   * Set whether to write skip records.
+   */
+  protected void setWriteSkipRecs(boolean writeSkipRecs) {
+    this.writeSkipRecs = writeSkipRecs;
+  }
+  
+  /**
+   * Get skipRanges.
+   */
+  public SortedRanges getSkipRanges() {
+    return skipRanges;
   }
 
   /**
-   * Set failed ranges.
-   * @param recordsToSkip
+   * Set skipRanges.
    */
-  public void setFailedRanges(SortedRanges failedRanges) {
-    this.failedRanges = failedRanges;
+  public void setSkipRanges(SortedRanges skipRanges) {
+    this.skipRanges = skipRanges;
   }
 
   /**
@@ -236,9 +249,10 @@
     taskId.write(out);
     out.writeInt(partition);
     taskStatus.write(out);
-    failedRanges.write(out);
+    skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(cleanupJob);
+    out.writeBoolean(writeSkipRecs);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
@@ -246,11 +260,12 @@
     partition = in.readInt();
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
-    failedRanges.readFields(in);
-    currentRecIndexIterator = failedRanges.skipRangeIterator();
+    skipRanges.readFields(in);
+    currentRecIndexIterator = skipRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     cleanupJob = in.readBoolean();
+    writeSkipRecs = in.readBoolean();
   }
 
   @Override
@@ -435,9 +450,9 @@
           if (counters != null) {
             counters.incrCounter(group, counter, amount);
           }
-          if(skipping && Counters.Application.GROUP.equals(group) && (
-              Counters.Application.MAP_PROCESSED_RECORDS.equals(counter) ||
-              Counters.Application.REDUCE_PROCESSED_RECORDS.equals(counter))) {
+          if(skipping && SkipBadRecords.COUNTER_GROUP.equals(group) && (
+              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) ||
+              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.equals(counter))) {
             //if application reports the processed records, move the 
             //currentRecStartIndex to the next.
             //currentRecStartIndex is the start index which has not yet been 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Thu Sep 18 04:47:59 2008
@@ -31,6 +31,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.net.Node;
 
 
@@ -77,7 +78,8 @@
   private int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
-  private volatile SortedRanges failedRanges = new SortedRanges();
+  private long maxSkipRecords = 0;
+  private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
   private boolean cleanup = false; 
    
@@ -127,6 +129,7 @@
     this.job = job;
     this.conf = conf;
     this.partition = partition;
+    this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
     setMaxTaskAttempts();
     init(jobid);
   }
@@ -144,6 +147,7 @@
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
+    this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
     setMaxTaskAttempts();
     init(jobid);
   }
@@ -467,6 +471,11 @@
       LOG.info("Error from "+taskid+": "+diagInfo);
       addDiagnosticInfo(taskid, diagInfo);
     }
+    
+    if(skipping) {
+      failedRanges.updateState(status);
+    }
+    
     if (oldStatus != null) {
       TaskStatus.State oldState = oldStatus.getRunState();
       TaskStatus.State newState = status.getRunState();
@@ -566,9 +575,13 @@
       if (taskState == TaskStatus.State.FAILED) {
         numTaskFailures++;
         machinesWhereFailed.add(trackerHostName);
-        LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
-        failedRanges.add(status.getNextRecordRange());
-        skipping = startSkipping();
+        if(maxSkipRecords>0) {
+          //skipping feature enabled
+          LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
+          failedRanges.add(status.getNextRecordRange());
+          skipping = startSkipping();
+        }
+
       } else {
         numKilledTasks++;
       }
@@ -584,7 +597,7 @@
    * Get whether to start skipping mode. 
    */
   private boolean startSkipping() {
-    if(SkipBadRecords.getEnabled(conf) && 
+    if(maxSkipRecords>0 && 
         numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
       return true;
     }
@@ -832,8 +845,12 @@
       t.setCleanupTask();
     }
     t.setConf(conf);
-    t.setFailedRanges(failedRanges);
+    LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
+    t.setSkipRanges(failedRanges.getSkipRanges());
     t.setSkipping(skipping);
+    if(failedRanges.isTestAttempt()) {
+      t.setWriteSkipRecs(false);
+    }
     tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
@@ -938,6 +955,100 @@
   public void clearSplit() {
     rawSplit.clearBytes();
   }
+  
+  /**
+   * This class keeps the records to be skipped during further executions 
+   * based on failed records from all the previous attempts.
+   * It also narrow down the skip records if it is more than the 
+   * acceptable value by dividing the failed range into half. In this case one 
+   * half is executed in the next attempt (test attempt). 
+   * In the test attempt, only the test range gets executed, others get skipped. 
+   * Based on the success/failure of the test attempt, the range is divided 
+   * further.
+   */
+  private class FailedRanges {
+    private SortedRanges skipRanges = new SortedRanges();
+    private Divide divide;
+    
+    synchronized SortedRanges getSkipRanges() {
+      if(divide!=null) {
+        return divide.skipRange;
+      }
+      return skipRanges;
+    }
+    
+    synchronized boolean isTestAttempt() {
+      return divide!=null;
+    }
+    
+    synchronized long getIndicesCount() {
+      if(isTestAttempt()) {
+        return divide.skipRange.getIndicesCount();
+      }
+      return skipRanges.getIndicesCount();
+    }
+    
+    synchronized void updateState(TaskStatus status){
+      if (isTestAttempt() && 
+          (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
+        divide.testPassed = true;
+        //since it was the test attempt we need to set it to failed
+        //as it worked only on the test range
+        status.setRunState(TaskStatus.State.FAILED);
+        
+      }
+    }
+    
+    synchronized void add(Range failedRange) {
+      LOG.warn("FailedRange:"+ failedRange);
+      if(divide!=null) {
+        LOG.warn("FailedRange:"+ failedRange +"  test:"+divide.test +
+            "  pass:"+divide.testPassed);
+        if(divide.testPassed) {
+          //test range passed
+          //other range would be bad. test it
+          failedRange = divide.other;
+        }
+        else {
+          //test range failed
+          //other range would be good.
+          failedRange = divide.test;
+        }
+        //reset
+        divide = null;
+      }
+      
+      if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) {
+        skipRanges.add(failedRange);
+      } else {
+        //start dividing the range to narrow down the skipped
+        //records until maxSkipRecords are met OR all attempts
+        //get exhausted
+        divide = new Divide(failedRange);
+      }
+    }
+    
+    class Divide {
+      private final SortedRanges skipRange;
+      private final Range test;
+      private final Range other;
+      private boolean testPassed;
+      Divide(Range range){
+        long half = range.getLength()/2;
+        test = new Range(range.getStartIndex(), half);
+        other = new Range(test.getEndIndex(), range.getLength()-half);
+        //construct the skip range from the skipRanges
+        skipRange = new SortedRanges();
+        for(Range r : skipRanges.getRanges()) {
+          skipRange.add(r);
+        }
+        skipRange.add(new Range(0,test.getStartIndex()));
+        skipRange.add(new Range(test.getEndIndex(), 
+            (Long.MAX_VALUE-test.getEndIndex())));
+      }
+    }
+    
+  }
 
   TreeMap<TaskAttemptID, String> getActiveTasks() {
     return activeTasks;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Thu Sep 18 04:47:59 2008
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.MapRunnable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -72,7 +71,8 @@
     }
 
     this.job = jobConf;
-    this.incrProcCount = job.getBoolean("mapred.skip.on", false) && 
+    //increment processed counter only if skipping feature is enabled
+    this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
       SkipBadRecords.getAutoIncrMapperProcCount(job);
     this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
         jobConf);
@@ -228,8 +228,8 @@
         // map pair to output
         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
         if(incrProcCount) {
-          reporter.incrCounter(Counters.Application.GROUP, 
-              Counters.Application.MAP_PROCESSED_RECORDS, 1);
+          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
+              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
         }
       } catch (IOException ex) {
         // If there is an IOException during the call it is set in an instance

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java Thu Sep 18 04:47:59 2008
@@ -72,7 +72,8 @@
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(1);
     conf.setInt("mapred.task.timeout", 30*1000);
-    SkipBadRecords.setEnabled(conf, true);
+    SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE);
+    SkipBadRecords.setReducerMaxSkipGroups(conf, Long.MAX_VALUE);
     
     SkipBadRecords.setAttemptsToStartSkipping(conf,0);
     //the no of attempts to successfully complete the task depends 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=696640&r1=696639&r2=696640&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Thu Sep 18 04:47:59 2008
@@ -90,7 +90,6 @@
     FileInputFormat.setInputPaths(conf, new Path("/in"));
     final Path outp = new Path("/out");
     FileOutputFormat.setOutputPath(conf, outp);
-    SkipBadRecords.setEnabled(conf, false);
     RunningJob job = null;
     try {
       job = JobClient.runJob(conf);



Mime
View raw message