hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r727006 - in /hadoop/core/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/ReduceTask.java src/mapred/org/apache/hadoop/mapred/Task.java src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
Date Tue, 16 Dec 2008 10:17:02 GMT
Author: cdouglas
Date: Tue Dec 16 02:17:02 2008
New Revision: 727006

URL: http://svn.apache.org/viewvc?rev=727006&view=rev
Log:
HADOOP-4845. Modify the reduce input byte counter to record only the
compressed size and add a human-readable label. Contributed by Yongqiang He

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 02:17:02 2008
@@ -428,6 +428,9 @@
     HADOOP-3921. Fixed clover (code coverage) target to work with JDK 6.
     (tomwhite via nigel)
 
+    HADOOP-4845. Modify the reduce input byte counter to record only the
+    compressed size and add a human-readable label. (Yongqiang He via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue
Dec 16 02:17:02 2008
@@ -111,8 +111,8 @@
   private Progress copyPhase;
   private Progress sortPhase;
   private Progress reducePhase;
-  private Counters.Counter reduceInputBytes = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_BYTES);
+  private Counters.Counter reduceShuffleBytes = 
+    getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
   private Counters.Counter reduceInputKeyCounter = 
     getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
@@ -380,7 +380,6 @@
         throw new IOException("Task: " + getTaskID() + 
             " - The reduce copier failed", reduceCopier.mergeThrowable);
       }
-      reduceInputBytes.increment(reduceCopier.reducerInputBytes);
     }
     copyPhase.complete();                         // copy is already complete
     setPhase(TaskStatus.Phase.SORT);
@@ -919,7 +918,7 @@
       
       byte[] data;
       final boolean inMemory;
-      long size;
+      long compressedSize;
       
       public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, 
                        Configuration conf, Path file, long size) {
@@ -928,14 +927,14 @@
         
         this.conf = conf;
         this.file = file;
-        this.size = size;
+        this.compressedSize = size;
         
         this.data = null;
         
         this.inMemory = false;
       }
       
-      public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data) {
+      public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data, int compressedLength)
{
         this.mapId = mapId;
         this.mapAttemptId = mapAttemptId;
         
@@ -943,7 +942,7 @@
         this.conf = null;
         
         this.data = data;
-        this.size = data.length;
+        this.compressedSize = compressedLength;
         
         this.inMemory = true;
       }
@@ -1262,7 +1261,7 @@
         }
         
         // The size of the map-output
-        long bytes = mapOutput.size;
+        long bytes = mapOutput.compressedSize;
         
         // lock the ReduceTask while we do the rename
         synchronized (ReduceTask.this) {
@@ -1467,7 +1466,7 @@
         byte[] shuffleData = new byte[mapOutputLength];
         MapOutput mapOutput = 
           new MapOutput(mapOutputLoc.getTaskId(), 
-                        mapOutputLoc.getTaskAttemptId(), shuffleData);
+                        mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
         
         int bytesRead = 0;
         try {
@@ -1741,12 +1740,10 @@
       return numInFlight > maxInFlight;
     }
     
-    long           reducerInputBytes = 0;
     
     public boolean fetchOutputs() throws IOException {
       int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
-      long           bytesTransferred = 0;
       DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
       final Progress copyPhase = 
         reduceTask.getProgress().phase();
@@ -1938,12 +1935,11 @@
             if (cr.getSuccess()) {  // a successful copy
               numCopied++;
               lastProgressTime = System.currentTimeMillis();
-              bytesTransferred += cr.getSize();
-              reducerInputBytes += cr.getSize();
+              reduceShuffleBytes.increment(cr.getSize());
                 
               long secsSinceStart = 
                 (System.currentTimeMillis()-startTime)/1000+1;
-              float mbs = ((float)bytesTransferred)/(1024*1024);
+              float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
               float transferRate = mbs/secsSinceStart;
                 
               copyPhase.startNextPhase();

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Tue Dec
16 02:17:02 2008
@@ -63,7 +63,7 @@
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
-    REDUCE_INPUT_BYTES, 
+    REDUCE_SHUFFLE_BYTES,
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS,
     REDUCE_SKIPPED_GROUPS,

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
Tue Dec 16 02:17:02 2008
@@ -10,6 +10,7 @@
 COMBINE_INPUT_RECORDS.name=    Combine input records
 COMBINE_OUTPUT_RECORDS.name=   Combine output records
 REDUCE_INPUT_GROUPS.name=      Reduce input groups
+REDUCE_SHUFFLE_BYTES.name=	   Reduce shuffle bytes
 REDUCE_INPUT_RECORDS.name=     Reduce input records
 REDUCE_OUTPUT_RECORDS.name=    Reduce output records
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records



Mime
View raw message