hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r815162 - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapreduce/
Date Tue, 15 Sep 2009 06:30:41 GMT
Author: cdouglas
Date: Tue Sep 15 06:30:41 2009
New Revision: 815162

URL: http://svn.apache.org/viewvc?rev=815162&view=rev
Log:
MAPREDUCE-112. Add counters for reduce input, output records to the new API.
Contributed by Jothi Padmanabhan

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep 15 06:30:41 2009
@@ -590,3 +590,6 @@
 
     MAPREDUCE-839. unit test TestMiniMRChildTask fails on mac os-x (hong tang
     via mahadev)
+
+    MAPREDUCE-112. Add counters for reduce input, output records to the new API.
+    (Jothi Padmanabhan via cdouglas)

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
Tue Sep 15 06:30:41 2009
@@ -68,7 +68,7 @@
 
       super(new Configuration(),
             new TaskAttemptID("mrunit-jt", 0, TaskType.REDUCE, 0, 0),
-            new MockRawKeyValueIterator(), null, null,
+            new MockRawKeyValueIterator(), null, null, null,
             new MockOutputCommitter(), new MockReporter(counters), null,
             (Class) Text.class, (Class) Text.class);
       this.inputIter = in.iterator();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Sep 15 06:30:41
2009
@@ -523,11 +523,15 @@
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
         outputFormat.getRecordWriter(taskContext);
+    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
+      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.Reducer.Context 
          reducerContext = createReduceContext(reducer, job, getTaskID(),
-                                               rIter, reduceInputValueCounter, 
-                                               output, committer,
+                                               rIter, reduceInputKeyCounter, 
+                                               reduceInputValueCounter, 
+                                               trackedRW,
+                                               committer,
                                                reporter, comparator, keyClass,
                                                valueClass);
     reducer.run(reducerContext);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Sep 15 06:30:41
2009
@@ -1069,6 +1069,7 @@
             org.apache.hadoop.mapreduce.TaskAttemptID.class,
             RawKeyValueIterator.class,
             org.apache.hadoop.mapreduce.Counter.class,
+            org.apache.hadoop.mapreduce.Counter.class,
             org.apache.hadoop.mapreduce.RecordWriter.class,
             org.apache.hadoop.mapreduce.OutputCommitter.class,
             org.apache.hadoop.mapreduce.StatusReporter.class,
@@ -1088,7 +1089,8 @@
                       Configuration job,
                       org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                       RawKeyValueIterator rIter,
-                      org.apache.hadoop.mapreduce.Counter inputCounter,
+                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,

                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
@@ -1098,7 +1100,8 @@
     try {
 
       return contextConstructor.newInstance(reducer, job, taskId,
-                                            rIter, inputCounter, output, 
+                                            rIter, inputKeyCounter, 
+                                            inputValueCounter, output, 
                                             committer, reporter, comparator, 
                                             keyClass, valueClass);
     } catch (InstantiationException e) {
@@ -1258,7 +1261,7 @@
           ReflectionUtils.newInstance(reducerClass, job);
       org.apache.hadoop.mapreduce.Reducer.Context 
            reducerContext = createReduceContext(reducer, job, taskId,
-                                                iterator, inputCounter, 
+                                                iterator, null, inputCounter, 
                                                 new OutputConverter(collector),
                                                 committer,
                                                 reporter, comparator, keyClass,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Tue Sep
15 06:30:41 2009
@@ -45,7 +45,8 @@
 public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   private RawKeyValueIterator input;
-  private Counter inputCounter;
+  private Counter inputValueCounter;
+  private Counter inputKeyCounter;
   private RawComparator<KEYIN> comparator;
   private KEYIN key;                                  // current key
   private VALUEIN value;                              // current value
@@ -70,7 +71,8 @@
   
   public ReduceContext(Configuration conf, TaskAttemptID taskid,
                        RawKeyValueIterator input, 
-                       Counter inputCounter,
+                       Counter inputKeyCounter,
+                       Counter inputValueCounter,
                        RecordWriter<KEYOUT,VALUEOUT> output,
                        OutputCommitter committer,
                        StatusReporter reporter,
@@ -80,7 +82,8 @@
                        ) throws InterruptedException, IOException{
     super(conf, taskid, output, committer, reporter);
     this.input = input;
-    this.inputCounter = inputCounter;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
     this.comparator = comparator;
     this.serializationFactory = new SerializationFactory(conf);
     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
@@ -100,6 +103,9 @@
       nextKeyValue();
     }
     if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
       return nextKeyValue();
     } else {
       return false;
@@ -134,7 +140,6 @@
     }
 
     hasMore = input.next();
-    inputCounter.increment(1);
     if (hasMore) {
       nextKey = input.getKey();
       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
@@ -146,6 +151,7 @@
     } else {
       nextKeyIsSame = false;
     }
+    inputValueCounter.increment(1);
     return true;
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java Tue Sep 15 06:30:41
2009
@@ -121,7 +121,8 @@
     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     public Context(Configuration conf, TaskAttemptID taskid,
                    RawKeyValueIterator input, 
-                   Counter inputCounter,
+                   Counter inputKeyCounter,
+                   Counter inputValueCounter,
                    RecordWriter<KEYOUT,VALUEOUT> output,
                    OutputCommitter committer,
                    StatusReporter reporter,
@@ -129,8 +130,9 @@
                    Class<KEYIN> keyClass,
                    Class<VALUEIN> valueClass
                    ) throws IOException, InterruptedException {
-      super(conf, taskid, input, inputCounter, output, committer, reporter, 
-            comparator, keyClass, valueClass);
+      super(conf, taskid, input, inputKeyCounter, inputValueCounter, 
+          output, committer, reporter, 
+          comparator, keyClass, valueClass);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=815162&r1=815161&r2=815162&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Sep 15 06:30:41 2009
@@ -158,9 +158,14 @@
                                      "REDUCE_INPUT_RECORDS").getValue();
     long mapOut = ctrs.findCounter(COUNTER_GROUP, 
                                    "MAP_OUTPUT_RECORDS").getValue();
+    long reduceOut = ctrs.findCounter(COUNTER_GROUP,
+                                      "REDUCE_OUTPUT_RECORDS").getValue();
+    long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
+                                       "REDUCE_INPUT_GROUPS").getValue();
     assertEquals("map out = combine in", mapOut, combineIn);
     assertEquals("combine out = reduce in", combineOut, reduceIn);
     assertTrue("combine in > combine out", combineIn > combineOut);
+    assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
     String group = "Random Group";
     CounterGroup ctrGrp = ctrs.getGroup(group);
     assertEquals(0, ctrGrp.size());



Mime
View raw message