hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r789232 - in /hadoop/common/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Date Mon, 29 Jun 2009 06:44:46 GMT
Author: cdouglas
Date: Mon Jun 29 06:44:46 2009
New Revision: 789232

URL: http://svn.apache.org/viewvc?rev=789232&view=rev
Log:
MAPREDUCE-179. Update progress in new RecordReaders.

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=789232&r1=789231&r2=789232&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Mon Jun 29 06:44:46 2009
@@ -156,6 +156,8 @@
     MAPREDUCE-657. Fix hardcoded filesystem problem in CompletedJobStatusStore.
     (Amar Kamat via sharad)
 
+    MAPREDUCE-179. Update progress in new RecordReaders. (cdouglas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=789232&r1=789231&r2=789232&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon
Jun 29 06:44:46 2009
@@ -380,10 +380,12 @@
     extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
+    private final TaskReporter reporter;
     
     NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
                             TaskReporter reporter) {
       this.real = real;
+      this.reporter = reporter;
       this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
     }
 
@@ -420,6 +422,7 @@
       if (result) {
         inputRecordCounter.increment(1);
       }
+      reporter.setProgress(getProgress());
       return result;
     }
   }

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=789232&r1=789231&r2=789232&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Mon Jun 29 06:44:46 2009
@@ -37,9 +37,12 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
@@ -93,6 +96,33 @@
     }
   }
 
+  public static class TrackingTextInputFormat extends TextInputFormat {
+
+    public static class MonoProgressRecordReader extends LineRecordReader {
+      private float last = 0.0f;
+      private boolean progressCalled = false;
+      @Override
+      public float getProgress() {
+        progressCalled = true;
+        final float ret = super.getProgress();
+        assertTrue("getProgress decreased", ret >= last);
+        last = ret;
+        return ret;
+      }
+      @Override
+      public synchronized void close() throws IOException {
+        assertTrue("getProgress never called", progressCalled);
+        super.close();
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(
+        InputSplit split, TaskAttemptContext context) {
+      return new MonoProgressRecordReader();
+    }
+  }
+
   private void runWordCount(Configuration conf
                             ) throws IOException,
                                      InterruptedException,
@@ -109,6 +139,7 @@
     job.setReducerClass(IntSumReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
     assertTrue(job.waitForCompletion(false));



Mime
View raw message