hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r722575 - in /hadoop/core/branches/branch-0.18: CHANGES.txt conf/hadoop-default.xml src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/Merger.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Date Tue, 02 Dec 2008 19:11:56 GMT
Author: cdouglas
Date: Tue Dec  2 11:11:55 2008
New Revision: 722575

URL: http://svn.apache.org/viewvc?rev=722575&view=rev
Log:
HADOOP-4714. Report status between merges and make the number of records
between progress reports configurable. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/conf/hadoop-default.xml
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=722575&r1=722574&r2=722575&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Dec  2 11:11:55 2008
@@ -57,6 +57,9 @@
    
    HADOOP-4635. Fix a memory leak in fuse dfs. (pete wyckoff via mahadev)
 
+    HADOOP-4714. Report status between merges and make the number of records
+    between progress reports configurable. (Jothi Padmanabhan via cdouglas)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.18/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/conf/hadoop-default.xml?rev=722575&r1=722574&r2=722575&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/conf/hadoop-default.xml (original)
+++ hadoop/core/branches/branch-0.18/conf/hadoop-default.xml Tue Dec  2 11:11:55 2008
@@ -1260,4 +1260,12 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.merge.recordsBeforeProgress</name>
+  <value>10000</value>
+  <description> The number of records to process during merge before
+   sending a progress notification to the TaskTracker.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=722575&r1=722574&r2=722575&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue
Dec  2 11:11:55 2008
@@ -1054,7 +1054,7 @@
               new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
           if (null == combinerClass || job.getCombineOnceOnly() ||
               numSpills < minSpillsForCombine) {
-            Merger.writeFile(kvIter, writer, reporter);
+            Merger.writeFile(kvIter, writer, reporter, job);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(kvIter, combineInputCounter);

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=722575&r1=722574&r2=722575&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Dec
 2 11:11:55 2008
@@ -43,8 +43,6 @@
 class Merger {  
   private static final Log LOG = LogFactory.getLog(Merger.class);
   
-  private static final long PROGRESS_BAR = 10000;
-
   // Local directories
   private static LocalDirAllocator lDirAlloc = 
     new LocalDirAllocator("mapred.local.dir");
@@ -78,13 +76,17 @@
 
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
-                 Progressable progressable) 
+                 Progressable progressable, Configuration conf)
   throws IOException {
+
     long recordCtr = 0;
+    long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
+         10000);
+
     while(records.next()) {
       writer.append(records.getKey(), records.getValue());
       
-      if ((++recordCtr % PROGRESS_BAR) == 0) {
+      if ((recordCtr++ % progressBar) == 0) {
         progressable.progress();
       }
     }
@@ -372,7 +374,7 @@
 
           Writer<K, V> writer = 
             new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
-          writeFile(this, writer, reporter);
+          writeFile(this, writer, reporter, conf);
           writer.close();
           
           //we finished one single level merge; now clean up the priority 

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=722575&r1=722574&r2=722575&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue
Dec  2 11:11:55 2008
@@ -2032,7 +2032,7 @@
                                   true, ioSortFactor, tmpDir, 
                                   conf.getOutputKeyComparator(), reporter);
               
-              Merger.writeFile(iter, writer, reporter);
+              Merger.writeFile(iter, writer, reporter, conf);
               writer.close();
             } catch (Exception e) {
               localFileSys.delete(outputPath, true);
@@ -2128,7 +2128,7 @@
                                conf.getOutputKeyComparator(), reporter);
           
           if (null == combinerClass) {
-            Merger.writeFile(rIter, writer, reporter);
+            Merger.writeFile(rIter, writer, reporter, conf);
           } else {
             combineCollector.setWriter(writer);
             combineAndSpill(rIter, reduceCombineInputCounter);



Mime
View raw message