hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r815163 - in /hadoop/common/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/
Date Tue, 15 Sep 2009 06:30:57 GMT
Author: cdouglas
Date: Tue Sep 15 06:30:56 2009
New Revision: 815163

URL: http://svn.apache.org/viewvc?rev=815163&view=rev
Log:
MAPREDUCE-112. Add counters for reduce input, output records to the new API.
Contributed by Jothi Padmanabhan

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Tue Sep 15 06:30:56 2009
@@ -1,5 +1,12 @@
 Hadoop Change Log
 
+Release 0.20.2 - Unreleased
+
+  BUG FIXES
+
+    MAPREDUCE-112. Add counters for reduce input, output records to the new API.
+    (Jothi Padmanabhan via cdouglas)
+
 Release 0.20.1 - 2009-09-01
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Tue Sep 15 06:30:56 2009
@@ -553,11 +553,14 @@
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
         outputFormat.getRecordWriter(taskContext);
+     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
+       new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.Reducer.Context 
          reducerContext = createReduceContext(reducer, job, getTaskID(),
-                                               rIter, reduceInputValueCounter, 
-                                               output, committer,
+                                               rIter, reduceInputKeyCounter,
+                                               reduceInputValueCounter, 
+                                               trackedRW, committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
     reducer.run(reducerContext);

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Tue Sep
15 06:30:56 2009
@@ -1022,6 +1022,7 @@
             org.apache.hadoop.mapreduce.TaskAttemptID.class,
             RawKeyValueIterator.class,
             org.apache.hadoop.mapreduce.Counter.class,
+            org.apache.hadoop.mapreduce.Counter.class,
             org.apache.hadoop.mapreduce.RecordWriter.class,
             org.apache.hadoop.mapreduce.OutputCommitter.class,
             org.apache.hadoop.mapreduce.StatusReporter.class,
@@ -1041,7 +1042,8 @@
                       Configuration job,
                       org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                       RawKeyValueIterator rIter,
-                      org.apache.hadoop.mapreduce.Counter inputCounter,
+                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,

                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
@@ -1051,7 +1053,8 @@
     try {
 
       return contextConstructor.newInstance(reducer, job, taskId,
-                                            rIter, inputCounter, output, 
+                                            rIter, inputKeyCounter, 
+                                            inputValueCounter, output, 
                                             committer, reporter, comparator, 
                                             keyClass, valueClass);
     } catch (InstantiationException e) {
@@ -1206,7 +1209,7 @@
           ReflectionUtils.newInstance(reducerClass, job);
       org.apache.hadoop.mapreduce.Reducer.Context 
            reducerContext = createReduceContext(reducer, job, taskId,
-                                                iterator, inputCounter, 
+                                                iterator, null, inputCounter, 
                                                 new OutputConverter(collector),
                                                 committer,
                                                 reporter, comparator, keyClass,

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
(original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
Tue Sep 15 06:30:56 2009
@@ -41,7 +41,8 @@
 public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   private RawKeyValueIterator input;
-  private Counter inputCounter;
+  private Counter inputKeyCounter;
+  private Counter inputValueCounter;
   private RawComparator<KEYIN> comparator;
   private KEYIN key;                                  // current key
   private VALUEIN value;                              // current value
@@ -57,7 +58,8 @@
 
   public ReduceContext(Configuration conf, TaskAttemptID taskid,
                        RawKeyValueIterator input, 
-                       Counter inputCounter,
+                       Counter inputKeyCounter,
+                       Counter inputValueCounter,
                        RecordWriter<KEYOUT,VALUEOUT> output,
                        OutputCommitter committer,
                        StatusReporter reporter,
@@ -67,7 +69,8 @@
                        ) throws InterruptedException, IOException{
     super(conf, taskid, output, committer, reporter);
     this.input = input;
-    this.inputCounter = inputCounter;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
     this.comparator = comparator;
     SerializationFactory serializationFactory = new SerializationFactory(conf);
     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -83,6 +86,9 @@
       nextKeyValue();
     }
     if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
       return nextKeyValue();
     } else {
       return false;
@@ -109,7 +115,6 @@
     buffer.reset(next.getData(), next.getPosition(), next.getLength());
     value = valueDeserializer.deserialize(value);
     hasMore = input.next();
-    inputCounter.increment(1);
     if (hasMore) {
       next = input.getKey();
       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
@@ -121,6 +126,7 @@
     } else {
       nextKeyIsSame = false;
     }
+    inputValueCounter.increment(1);
     return true;
   }
 
@@ -189,4 +195,4 @@
   Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
     return iterable;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
(original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
Tue Sep 15 06:30:56 2009
@@ -121,7 +121,8 @@
     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     public Context(Configuration conf, TaskAttemptID taskid,
                    RawKeyValueIterator input, 
-                   Counter inputCounter,
+                   Counter inputKeyCounter,
+                   Counter inputValueCounter,
                    RecordWriter<KEYOUT,VALUEOUT> output,
                    OutputCommitter committer,
                    StatusReporter reporter,
@@ -129,7 +130,8 @@
                    Class<KEYIN> keyClass,
                    Class<VALUEIN> valueClass
                    ) throws IOException, InterruptedException {
-      super(conf, taskid, input, inputCounter, output, committer, reporter, 
+      super(conf, taskid, input, inputKeyCounter, inputValueCounter,
+            output, committer, reporter, 
             comparator, keyClass, valueClass);
     }
   }
@@ -175,4 +177,4 @@
     }
     cleanup(context);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=815163&r1=815162&r2=815163&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Sep 15 06:30:56 2009
@@ -152,9 +152,14 @@
                                      "REDUCE_INPUT_RECORDS").getValue();
     long mapOut = ctrs.findCounter(COUNTER_GROUP, 
                                    "MAP_OUTPUT_RECORDS").getValue();
+    long reduceOut = ctrs.findCounter(COUNTER_GROUP,
+                                      "REDUCE_OUTPUT_RECORDS").getValue();
+    long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
+                                       "REDUCE_INPUT_GROUPS").getValue();
     assertEquals("map out = combine in", mapOut, combineIn);
     assertEquals("combine out = reduce in", combineOut, reduceIn);
     assertTrue("combine in > combine out", combineIn > combineOut);
+    assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
     String group = "Random Group";
     CounterGroup ctrGrp = ctrs.getGroup(group);
     assertEquals(0, ctrGrp.size());



Mime
View raw message