tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception (Tsuyoshi Ozawa via rbalamohan)
Date Fri, 04 Mar 2016 06:00:04 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2af886b50 -> 91e24d7c6


TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle
exception (Tsuyoshi Ozawa via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/91e24d7c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/91e24d7c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/91e24d7c

Branch: refs/heads/master
Commit: 91e24d7c65b150150125992095268aed77adc386
Parents: 2af886b
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Mar 4 11:28:15 2016 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Fri Mar 4 11:29:08 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../shuffle/orderedgrouped/MergeManager.java    | 31 +++++-----
 .../common/shuffle/orderedgrouped/Shuffle.java  |  4 +-
 .../orderedgrouped/TestMergeManager.java        | 64 ++++++++++++++++++--
 4 files changed, 80 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c6cacc..e84dc5a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle
exception.
   TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
   TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3151. Expose DAG credentials to plugins.

http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 0b0f6b6..4c999b4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -553,9 +553,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped
{
     return finalMergeComplete;
   }
 
-  public TezRawKeyValueIterator close() throws Throwable {
-    // TODO TEZ-2756. Don't attempt a final merge if close is invoked as a result of a previous
-    // shuffle exception / error.
+  public TezRawKeyValueIterator close(boolean tryFinalMerge) throws Throwable {
     if (!isShutdown.getAndSet(true)) {
       // Wait for on-going merges to complete
       if (memToMemMerger != null) {
@@ -571,18 +569,23 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped
{
       inMemoryMapOutputs.clear();
       List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
       onDiskMapOutputs.clear();
-      try {
-        TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
-        this.finalMergeComplete = true;
-        return kvIter;
-      } catch (InterruptedException e) {
-        //Cleanup the disk segments
-        if (cleanup) {
-          cleanup(localFS, disk);
-          cleanup(localFS, onDiskMapOutputs);
+
+      // Don't attempt a final merge if close is invoked as a result of a previous
+      // shuffle exception / error.
+      if (tryFinalMerge) {
+        try {
+          TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+          this.finalMergeComplete = true;
+          return kvIter;
+        } catch (InterruptedException e) {
+          //Cleanup the disk segments
+          if (cleanup) {
+            cleanup(localFS, disk);
+            cleanup(localFS, onDiskMapOutputs);
+          }
+          Thread.currentThread().interrupt(); //reset interrupt status
+          throw e;
         }
-        Thread.currentThread().interrupt(); //reset interrupt status
-        throw e;
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index fa66b7e..f40c49a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -310,7 +310,7 @@ public class Shuffle implements ExceptionReporter {
       TezRawKeyValueIterator kvIter = null;
       inputContext.notifyProgress();
       try {
-        kvIter = merger.close();
+        kvIter = merger.close(true);
       } catch (Throwable e) {
         // Set the throwable so that future.get() sees the reported errror.
         throwable.set(e);
@@ -351,7 +351,7 @@ public class Shuffle implements ExceptionReporter {
   private void cleanupMerger(boolean ignoreErrors) throws Throwable {
     if (!mergerClosed.getAndSet(true)) {
       try {
-        merger.close();
+        merger.close(false);
       } catch (InterruptedException e) {
         if (ignoreErrors) {
           //Reset the status

http://git-wip-us.apache.org/repos/asf/tez/blob/91e24d7c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index c84794d..9eb4cae 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -306,7 +306,7 @@ public class TestMergeManager {
     assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size());
     assertEquals(1, mergeManager.inMemoryMapOutputs.size());
 
-    mergeManager.close();
+    mergeManager.close(true);
 
 
     /**
@@ -367,7 +367,7 @@ public class TestMergeManager {
     assertEquals(1, mergeManager.inMemoryMergedMapOutputs.size());
     assertEquals(2, mergeManager.inMemoryMapOutputs.size());
 
-    mergeManager.close();
+    mergeManager.close(true);
 
     /**
      * Test #3
@@ -421,7 +421,7 @@ public class TestMergeManager {
     assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
     assertEquals(4, mergeManager.inMemoryMapOutputs.size());
 
-    mergeManager.close();
+    mergeManager.close(true);
 
     /**
      * Test #4
@@ -481,7 +481,63 @@ public class TestMergeManager {
     //Check if inMemorySegment has got the MapOutput back for merging later
     assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size());
 
-    mergeManager.close();
+    mergeManager.close(true);
+
+    /**
+     * Test #5
+     * - Same to #4, but calling mergeManager.close(false) and confirm that final merge doesn't
occur.
+     */
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 4);
+    mergeManager =
+        new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null,
null,
+            exceptionReporter, 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+
+    //Single shuffle limit is 25% of 2000000
+    data1 = generateDataBySize(conf, 490000);
+    data2 = generateDataBySize(conf, 490000);
+    data3 = generateDataBySize(conf, 490000);
+    data4 = generateDataBySize(conf, 230000);
+
+    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length,
0);
+    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length,
0);
+    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length,
0);
+    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length,
0);
+
+    assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000));
+
+    assertEquals(MapOutput.Type.MEMORY, mo1.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo2.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo3.getType());
+    assertEquals(MapOutput.Type.MEMORY, mo4.getType());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    assertEquals(data1.length + data2.length + data3.length + data4.length,
+        mergeManager.getUsedMemory());
+
+    System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
+    System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
+    System.arraycopy(data3, 0, mo3.getMemory(), 0, data3.length);
+    System.arraycopy(data4, 0, mo4.getMemory(), 0, data4.length);
+
+    //Committing 4 segments should trigger mem-to-mem merge
+    mo1.commit();
+    mo2.commit();
+    mo3.commit();
+    mo4.commit();
+
+    //4 segments were there originally in inMemoryMapOutput.
+    numberOfMapOutputs = 4;
+
+    //Wait for mem-to-mem to complete. Since only 1 segment (230000) can fit
+    //into memory, it should return early
+    mergeManager.waitForMemToMemMerge();
+
+    //Check if inMemorySegment has got the MapOutput back for merging later
+    assertEquals(numberOfMapOutputs, mergeManager.inMemoryMapOutputs.size());
+
+    Assert.assertNull(mergeManager.close(false));
+    Assert.assertFalse(mergeManager.isMergeComplete());
   }
 
   private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {


Mime
View raw message