tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct (Eric Badger via jeagles)
Date Thu, 03 Nov 2016 18:00:35 GMT
Repository: tez
Updated Branches:
  refs/heads/master cac9237b5 -> d0bafcef5


TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct (Eric Badger via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d0bafcef
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d0bafcef
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d0bafcef

Branch: refs/heads/master
Commit: d0bafcef5617928d722d9763d53b68137bac192b
Parents: cac9237
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu Nov 3 13:00:10 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Nov 3 13:00:10 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../tez/mapreduce/combine/MRCombiner.java       | 24 ++++---
 .../tez/mapreduce/combine/TestMRCombiner.java   | 73 ++++++++++++++++++++
 .../runtime/library/common/ValuesIterator.java  |  4 +-
 4 files changed, 92 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 48c0f1e..58a3346 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct
   TEZ-3247. Add more unit test coverage for container reuse.
   TEZ-3215. Support for MultipleOutputs.
   TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess.
@@ -139,6 +140,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct
   TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess.
   TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
@@ -640,6 +642,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct
   TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs

http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
index 5ad3136..9514215 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
@@ -71,8 +71,8 @@ public class MRCombiner implements Combiner {
   private final RawComparator<?> comparator;
   private final boolean useNewApi;
   
-  private final TezCounter combineInputKeyCounter;
-  private final TezCounter combineInputValueCounter;
+  private final TezCounter combineInputRecordsCounter;
+  private final TezCounter combineOutputRecordsCounter;
   
   private final MRTaskReporter reporter;
   private final TaskAttemptID mrTaskAttemptID;
@@ -95,8 +95,8 @@ public class MRCombiner implements Combiner {
 
     this.useNewApi = ConfigUtils.useNewApi(conf);
     
-    combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+    combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
     
     boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
     this.mrTaskAttemptID = new TaskAttemptID(
@@ -130,6 +130,7 @@ public class MRCombiner implements Combiner {
       @Override
       public void collect(Object key, Object value) throws IOException {
         writer.append(key, value);
+        combineOutputRecordsCounter.increment(1);
       }
     };
     
@@ -145,7 +146,7 @@ public class MRCombiner implements Combiner {
         Class<KEY> keyClass, Class<VALUE> valClass,
         RawComparator<KEY> comparator) throws IOException {
       super(rawIter, comparator, keyClass, valClass, conf,
-          combineInputKeyCounter, combineInputValueCounter);
+          null, combineInputRecordsCounter);
     }
   }
   
@@ -161,6 +162,7 @@ public class MRCombiner implements Combiner {
       public void write(Object key, Object value) throws IOException,
           InterruptedException {
         writer.append(key, value);
+        combineOutputRecordsCounter.increment(1);
       }
 
       @Override
@@ -180,8 +182,8 @@ public class MRCombiner implements Combiner {
             conf,
             mrTaskAttemptID,
             rawIter,
-            new MRCounters.MRCounter(combineInputKeyCounter),
-            new MRCounters.MRCounter(combineInputValueCounter),
+            new MRCounters.MRCounter(combineInputRecordsCounter),
+            new MRCounters.MRCounter(combineOutputRecordsCounter),
             recordWriter,
             reporter,
             (RawComparator)comparator,
@@ -196,8 +198,8 @@ public class MRCombiner implements Combiner {
       Configuration conf,
       TaskAttemptID mrTaskAttemptID,
       final TezRawKeyValueIterator rawIter,
-      Counter combineInputKeyCounter,
-      Counter combineInputValueCounter,
+      Counter combineInputRecordsCounter,
+      Counter combineOutputRecordsCounter,
       RecordWriter<KEYOUT, VALUEOUT> recordWriter,
       MRTaskReporter reporter,
       RawComparator<KEYIN> comparator,
@@ -233,8 +235,8 @@ public class MRCombiner implements Combiner {
     };
 
     ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN,
VALUEIN, KEYOUT, VALUEOUT>(
-        conf, mrTaskAttemptID, r, combineInputKeyCounter,
-        combineInputValueCounter, recordWriter, null, reporter, comparator,
+        conf, mrTaskAttemptID, r, null,
+        combineInputRecordsCounter, recordWriter, null, reporter, comparator,
         keyClass, valClass);
 
     org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext
= new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()

http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
index a92f8dd..a796e59 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/combine/TestMRCombiner.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezUtils;
@@ -44,6 +45,8 @@ import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestMRCombiner {
 
   @Test
@@ -55,6 +58,10 @@ public class TestMRCombiner {
     MRCombiner combiner = new MRCombiner(taskContext);
     Writer writer = Mockito.mock(Writer.class);
     combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+    long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
+    long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
+    assertEquals(6, inputRecords);
+    assertEquals(3, outputRecords);
     // verify combiner output keys and values
     verifyKeyAndValues(writer);
   }
@@ -70,10 +77,46 @@ public class TestMRCombiner {
     MRCombiner combiner = new MRCombiner(taskContext);
     Writer writer = Mockito.mock(Writer.class);
     combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+    long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
+    long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
+    assertEquals(6, inputRecords);
+    assertEquals(3, outputRecords);
     // verify combiner output keys and values
     verifyKeyAndValues(writer);
   }
 
+  @Test
+  public void testTop2RunOldCombiner() throws IOException, InterruptedException {
+    TezConfiguration conf = new TezConfiguration();
+    setKeyAndValueClassTypes(conf);
+    conf.setClass("mapred.combiner.class", Top2OldReducer.class, Object.class);
+    TaskContext taskContext = getTaskContext(conf);
+    MRCombiner combiner = new MRCombiner(taskContext);
+    Writer writer = Mockito.mock(Writer.class);
+    combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+    long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
+    long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
+    assertEquals(6, inputRecords);
+    assertEquals(5, outputRecords);
+  }
+
+  @Test
+  public void testTop2RunNewCombiner() throws IOException, InterruptedException {
+    TezConfiguration conf = new TezConfiguration();
+    setKeyAndValueClassTypes(conf);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, Top2NewReducer.class,
+        Object.class);
+    TaskContext taskContext = getTaskContext(conf);
+    MRCombiner combiner = new MRCombiner(taskContext);
+    Writer writer = Mockito.mock(Writer.class);
+    combiner.combine(new TezRawKeyValueIteratorTest(), writer);
+    long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
+    long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
+    assertEquals(6, inputRecords);
+    assertEquals(5, outputRecords);
+  }
+
   private void setKeyAndValueClassTypes(TezConfiguration conf) {
     conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
         Text.class, Object.class);
@@ -185,4 +228,34 @@ public class TestMRCombiner {
       context.write(new Text(key.toString()), new IntWritable(count));
     }
   }
+
+  private static class Top2OldReducer extends OldReducer {
+    @Override
+    public void reduce(Text key, Iterator<IntWritable> value,
+        OutputCollector<Text, IntWritable> collector, Reporter reporter)
+        throws IOException {
+      int i = 0;
+      while (value.hasNext()) {
+        int val = value.next().get();
+        if (i++ < 2) {
+          collector.collect(new Text(key.toString()), new IntWritable(val));
+        }
+      }
+    }
+  }
+
+  private static class Top2NewReducer extends NewReducer {
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values,
+        Context context) throws IOException, InterruptedException {
+      int i = 0;
+      for (IntWritable value : values) {
+        if (i++ < 2) {
+          context.write(new Text(key.toString()), value);
+        } else {
+          break;
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d0bafcef/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index f4da742..7add8c5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -189,7 +189,9 @@ public class ValuesIterator<KEY,VALUE> {
         if (key == null || false == hasMoreValues) {
           // invariant: more=true & there are no more values in an existing key group
           // so this indicates start of new key group
-          inputKeyCounter.increment(1);
+          if(inputKeyCounter != null) {
+            inputKeyCounter.increment(1);
+          }
           ++keyCtr;
         }
       } else {


Mime
View raw message