hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1152964 - in /hadoop/common/trunk/mapreduce: ./ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
Date Mon, 01 Aug 2011 22:53:09 GMT
Author: acmurthy
Date: Mon Aug  1 22:53:08 2011
New Revision: 1152964

URL: http://svn.apache.org/viewvc?rev=1152964&view=rev
Log:
MAPREDUCE-2187. Reporter sends progress during sort/merge. Contributed by Anupam Seth.

Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/src/java/mapred-default.xml
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1152964&r1=1152963&r2=1152964&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Mon Aug  1 22:53:08 2011
@@ -40,6 +40,9 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
+    acmurthy) 
+
     MAPREDUCE-2365. Add counters to track bytes (read,written) via 
     File(Input,Output)Format. (Siddharth Seth via acmurthy)
  

Modified: hadoop/common/trunk/mapreduce/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/mapred-default.xml?rev=1152964&r1=1152963&r2=1152964&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/mapred-default.xml (original)
+++ hadoop/common/trunk/mapreduce/src/java/mapred-default.xml Mon Aug  1 22:53:08 2011
@@ -1042,6 +1042,14 @@
 <!-- End of TaskTracker DistributedCache configuration -->
 
 <property>
+  <name>mapreduce.task.combine.progress.records</name>
+  <value>10000</value>
+  <description> The number of records to process during combine output collection 
+   before sending a progress notification to the TaskTracker.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.task.merge.progress.records</name>
   <value>10000</value>
   <description> The number of records to process during merge before

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1152964&r1=1152963&r2=1152964&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java Mon Aug 
1 22:53:08 2011
@@ -946,7 +946,7 @@ class MapTask extends Task {
       if (combinerRunner != null) {
         final Counters.Counter combineOutputCounter =
           reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
-        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
+        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter,
conf);
       } else {
         combineCollector = null;
       }

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1152964&r1=1152963&r2=1152964&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Aug
 1 22:53:08 2011
@@ -352,7 +352,7 @@ public class ReduceTask extends Task {
       Class combinerClass = conf.getCombinerClass();
       CombineOutputCollector combineCollector = 
         (null != combinerClass) ? 
-            new CombineOutputCollector(reduceCombineOutputCounter) : null;
+ 	     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
 
       Shuffle shuffle = 
         new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, 

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1152964&r1=1152963&r2=1152964&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Mon Aug  1 22:53:08
2011
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
@@ -79,6 +80,7 @@ abstract public class Task implements Wr
     LogFactory.getLog(Task.class);
 
   public static String MERGED_OUTPUT_PREFIX = ".merged";
+  public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
 
   /**
    * Counters to measure the usage of the different file systems.
@@ -1176,16 +1178,26 @@ abstract public class Task implements Wr
   implements OutputCollector<K, V> {
     private Writer<K, V> writer;
     private Counters.Counter outCounter;
-    public CombineOutputCollector(Counters.Counter outCounter) {
+    private Progressable progressable;
+    private long progressBar;
+
+    public CombineOutputCollector(Counters.Counter outCounter, Progressable progressable,
Configuration conf) {
       this.outCounter = outCounter;
+      this.progressable=progressable;
+      progressBar = conf.getLong(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS);
     }
+    
     public synchronized void setWriter(Writer<K, V> writer) {
       this.writer = writer;
     }
+
     public synchronized void collect(K key, V value)
         throws IOException {
       outCounter.increment(1);
       writer.append(key, value);
+      if ((outCounter.getValue() % progressBar) == 0) {
+        progressable.progress();
+      }
     }
   }
 

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1152964&r1=1152963&r2=1152964&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon
Aug  1 22:53:08 2011
@@ -260,6 +260,8 @@ public interface MRJobConfig {
 
   public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled";
 
+  public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records";
+
   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
 
   public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";



Mime
View raw message