tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts. Contributed by Jason Lowe.
Date Thu, 30 Jun 2016 23:18:15 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 883e76fd5 -> a83479b1c


TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory
merge that never starts. Contributed by Jason Lowe.

(cherry picked from commit 71bb2defe97e55e3bf7dbb299fe33ab8a667e7a1)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a83479b1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a83479b1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a83479b1

Branch: refs/heads/branch-0.8
Commit: a83479b1cd114a8771caa3041a1babcdeed3a975
Parents: 883e76f
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Jun 30 16:13:23 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Jun 30 16:18:13 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../FetchedInputAllocatorOrderedGrouped.java    |  1 +
 .../shuffle/orderedgrouped/InMemoryReader.java  |  2 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  7 +++++-
 .../orderedgrouped/TestMergeManager.java        | 26 ++++++++++++++++++++
 5 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e0b751..0e50032 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3314. Double counting input bytes in MultiMRInput.
   TEZ-3308. Add counters to capture input split length.
   TEZ-3302. Add a version of processorContext.waitForAllInputsReady and waitForAnyInputReady
with a timeout.
@@ -471,6 +472,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
   TEZ-3304. TestHistoryParser fails with Hadoop 2.7.
   TEZ-3296. Tez job can hang if two vertices at the same root distance have different task
requirements

http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
index 7276f74..e145632 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -36,4 +36,5 @@ public interface FetchedInputAllocatorOrderedGrouped {
 
   void unreserve(long bytes);
 
+  void releaseCommittedMemory(long bytes);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 7860377..12fe057 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -258,7 +258,7 @@ public class InMemoryReader extends Reader {
     buffer = null;
     // Inform the MergeManager
     if (merger != null) {
-      merger.unreserve(bufferSize);
+      merger.releaseCommittedMemory(bufferSize);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 9e2fbd4..26bdca7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -451,7 +451,6 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped
{
 
   @Override
   public synchronized void unreserve(long size) {
-    commitMemory -= size;
     usedMemory -= size;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Notifying unreserve : size=" + size + ", commitMemory=" + commitMemory +
", usedMemory=" + usedMemory
@@ -461,6 +460,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped
{
   }
 
   @Override
+  public synchronized void releaseCommittedMemory(long size) {
+    commitMemory -= size;
+    unreserve(size);
+  }
+
+  @Override
   public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
     inMemoryMapOutputs.add(mapOutput);
     LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()

http://git-wip-us.apache.org/repos/asf/tez/blob/a83479b1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9eb4cae..9209ff4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -178,6 +178,32 @@ public class TestMergeManager {
     Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
   }
 
+  @Test(timeout = 10000)
+  public void testReservationAccounting() throws IOException {
+    Configuration conf = new TezConfiguration(defaultConf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
+    MergeManager mergeManager =
+        new MergeManager(conf, localFs, null, inputContext, null, null, null, null,
+        mock(ExceptionReporter.class), 2000000, null, false, -1);
+    mergeManager.configureAndStart();
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+    MapOutput mapOutput = mergeManager.reserve(null, 1, 1, 0);
+    assertEquals(1, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+    mapOutput.abort();
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+    mapOutput = mergeManager.reserve(null, 2, 2, 0);
+    mergeManager.closeInMemoryFile(mapOutput);
+    assertEquals(2, mergeManager.getUsedMemory());
+    assertEquals(2, mergeManager.getCommitMemory());
+    mergeManager.releaseCommittedMemory(2);
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+  }
+
   @Test(timeout=20000)
   public void testIntermediateMemoryMergeAccounting() throws Exception {
     Configuration conf = new TezConfiguration(defaultConf);


Mime
View raw message