tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [01/50] [abbrv] tez git commit: TEZ-2363: Fix off-by-one error in REDUCE_INPUT_RECORDS counter (gopalv)
Date Wed, 06 May 2015 07:41:03 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 437b57152 -> 9338abfe9 (forced update)


TEZ-2363: Fix off-by-one error in REDUCE_INPUT_RECORDS counter (gopalv)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e36f962e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e36f962e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e36f962e

Branch: refs/heads/TEZ-2003
Commit: e36f962e78f301974cd8eed2380a6a5bd26a49ae
Parents: 4ae87f0
Author: Gopal V <gopalv@apache.org>
Authored: Thu Apr 30 20:40:49 2015 +0530
Committer: Gopal V <gopalv@apache.org>
Committed: Thu Apr 30 20:40:49 2015 +0530

----------------------------------------------------------------------
 .../runtime/library/common/ValuesIterator.java  | 19 ++---
 .../library/common/TestValuesIterator.java      | 73 +++++++++++++++++++-
 2 files changed, 81 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e36f962e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index 0f1bc5b..a1f52e7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -95,14 +95,13 @@ public class ValuesIterator<KEY,VALUE> {
       readNextKey();
       key = nextKey;
       nextKey = null;
-      hasMoreValues = more;
       isFirstRecord = false;
     } else {
       nextKey();
     }
     return more;
   }
-  
+
   /** The current key. */
   public KEY getKey() { 
     return key; 
@@ -162,11 +161,7 @@ public class ValuesIterator<KEY,VALUE> {
     while (hasMoreValues) { 
       readNextKey();
     }
-    if (more) {
-      inputKeyCounter.increment(1);
-      ++keyCtr;
-    }
-    
+
     // move the next key to the current one
     KEY tmpKey = key;
     key = nextKey;
@@ -185,8 +180,14 @@ public class ValuesIterator<KEY,VALUE> {
         keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
             nextKeyBytes.getLength() - nextKeyBytes.getPosition());
         nextKey = keyDeserializer.deserialize(nextKey);
-        // TODO Is a counter increment required here ?
-        hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+        // hasMoreValues = is it first key or is key the same?
+        hasMoreValues = (key == null) || (comparator.compare(key, nextKey) == 0);
+        if (key == null || false == hasMoreValues) {
+          // invariant: more=true & there are no more values in an existing key group
+          // so this indicates start of new key group
+          inputKeyCounter.increment(1);
+          ++keyCtr;
+        }
       } else {
         hasMoreValues = in.isSameKey();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/e36f962e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index c483a81..e1718c8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -64,6 +64,7 @@ import java.util.TreeMap;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -179,6 +180,32 @@ public class TestValuesIterator {
   }
 
   @Test(timeout = 20000)
+  public void testCountedIteratorWithInmemoryReader() throws IOException {
+    verifyCountedIteratorReader(true);
+  }
+
+  @Test(timeout = 20000)
+  public void testCountedIteratorWithIFileReader() throws IOException {
+    verifyCountedIteratorReader(false);
+  }
+
+  private void verifyCountedIteratorReader(boolean inMemory) throws IOException {
+    TezCounter keyCounter = new GenericCounter("inputKeyCounter", "y3");
+    TezCounter tupleCounter = new GenericCounter("inputValuesCounter", "y4");
+    ValuesIterator iterator = createCountedIterator(inMemory, keyCounter,
+        tupleCounter);
+    List<Integer> sequence = verifyIteratorData(iterator);
+    if (expectedTestResult) {
+      assertEquals((long) sequence.size(), keyCounter.getValue());
+      long rows = 0;
+      for (Integer i : sequence) {
+        rows += i.longValue();
+      }
+      assertEquals(rows, tupleCounter.getValue());
+    }
+  }
+
+  @Test(timeout = 20000)
   public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
     ValuesIterator iterator = createEmptyIterator(false);
     assert(iterator.moveToNext() == false);
@@ -212,13 +239,19 @@ public class TestValuesIterator {
 
   /**
    * Tests whether data in valuesIterator matches with sorted input data set.
-   *
+   * 
+   * Returns a list of value counts for each key.
+   * 
    * @param valuesIterator
+   * @return List
    * @throws IOException
    */
-  private void verifyIteratorData(ValuesIterator valuesIterator) throws IOException {
+  private List<Integer> verifyIteratorData(
+      ValuesIterator valuesIterator) throws IOException {
     boolean result = true;
 
+    ArrayList<Integer> sequence = new ArrayList<Integer>();
+
     //sort original data based on comparator
     ListMultimap<Writable, Writable> sortedMap =
         new ImmutableListMultimap.Builder<Writable, Writable>()
@@ -240,6 +273,7 @@ public class TestValuesIterator {
         break;
       }
 
+      int valueCount = 0;
       //Verify values
       Iterator<Writable> vItr = valuesIterator.getValues().iterator();
       for (Writable val : sortedMap.get(oriKey)) {
@@ -250,13 +284,19 @@ public class TestValuesIterator {
           result = false;
           break;
         }
+
+        valueCount++;
       }
+      sequence.add(valueCount);
+      assertTrue("At least 1 value per key", valueCount > 0);
     }
     if (expectedTestResult) {
       assertTrue(result);
     } else {
       assertFalse(result);
     }
+
+    return sequence;
   }
 
   /**
@@ -287,6 +327,35 @@ public class TestValuesIterator {
         (TezCounter) new GenericCounter("inputValueCounter", "y4"));
   }
 
+  /**
+   * Create sample data (in memory), with an attached counter  and return ValuesIterator
+   *
+   * @param inMemory
+   * @param keyCounter
+   * @param tupleCounter
+   * @return ValuesIterator
+   * @throws IOException
+   */
+  private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter
tupleCounter) throws IOException {
+    if (!inMemory) {
+      streamPaths = createFiles();
+      //Merge all files to get KeyValueIterator
+      rawKeyValueIterator =
+          TezMerger.merge(conf, fs, keyClass, valClass, null,
+              false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
+              new ProgressReporter(), null, null, null, null);
+    } else {
+      List<TezMerger.Segment> segments = createInMemStreams();
+      rawKeyValueIterator =
+          TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+              comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
+              new GenericCounter("writesCounter", "y1"),
+              new GenericCounter("bytesReadCounter", "y2"), new Progress());
+    }
+    return new ValuesIterator(rawKeyValueIterator, comparator,
+        keyClass, valClass, conf, keyCounter, tupleCounter);
+  }
+
   @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3} {4} {5} {6}]")
   public static Collection<Object[]> getParameters() {
     Collection<Object[]> parameters = new ArrayList<Object[]>();


Mime
View raw message