tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure (Rajesh Balamohan)
Date Wed, 25 Mar 2015 00:35:18 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 4facc6947 -> a95c2bbac


TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure (Rajesh Balamohan)

(cherry picked from commit eb330412bf1f4b19f316048635fc9adff09a0b02)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a95c2bba
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a95c2bba
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a95c2bba

Branch: refs/heads/branch-0.5
Commit: a95c2bbacd686d713c139099dee7ea391b0b4cf3
Parents: 4facc69
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Jan 9 10:13:26 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Mar 25 06:04:48 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../orderedgrouped/FetcherOrderedGrouped.java   |  4 +-
 .../shuffle/orderedgrouped/MergeManager.java    | 59 +++++++++++++-------
 3 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a95c2bba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 601bae6..9d2c429 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure.
   TEZ-2219. Should verify the input_name/output_name to be unique per vertex
   TEZ-2186. OOM with a simple scatter gather job with re-use
   TEZ-2220. TestTezJobs compile failure in branch 0.5.

http://git-wip-us.apache.org/repos/asf/tez/blob/a95c2bba/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index d51e45e..14772e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -153,6 +153,9 @@ class FetcherOrderedGrouped extends Thread {
           // If merge is on, block
           merger.waitForInMemoryMerge();
 
+          // In case usedMemory > memorylimit, wait until some memory is released
+          merger.waitForShuffleToMergeMemory();
+
           // Get a host to shuffle from
           host = scheduler.getHost();
           metrics.threadBusy();
@@ -455,7 +458,6 @@ class FetcherOrderedGrouped extends Thread {
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
-        // TODO Review: Does this cause a tight loop ?
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
         //Not an error but wait to process data.
         return EMPTY_ATTEMPT_ID_ARRAY;

http://git-wip-us.apache.org/repos/asf/tez/blob/a95c2bba/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index fd8b1ea..ad50bb5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -331,6 +331,12 @@ public class MergeManager {
     return (requestedSize < maxSingleShuffleLimit);
   }
 
+  public synchronized void waitForShuffleToMergeMemory() throws InterruptedException {
+    while(usedMemory > memoryLimit) {
+      wait();
+    }
+  }
+
   final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
 
   public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
@@ -362,16 +368,20 @@ public class MergeManager {
     // all the stalled threads
     
     if (usedMemory > memoryLimit) {
-      LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
-          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
-          " CommitMemory is (" + commitMemory + ")"); 
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
+            + ") is greater than memoryLimit (" + memoryLimit + ")." +
+            " CommitMemory is (" + commitMemory + ")");
+      }
       return stallShuffle;
     }
     
     // Allow the in-memory shuffle to progress
-    LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
-        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
-        + "CommitMemory is (" + commitMemory + ")"); 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
+          + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+          + "CommitMemory is (" + commitMemory + ")");
+    }
     return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
   }
   
@@ -389,6 +399,11 @@ public class MergeManager {
   synchronized void unreserve(long size) {
     commitMemory -= size;
     usedMemory -= size;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Notifying unreserve : commitMemory=" + commitMemory + ", usedMemory=" +
usedMemory
+          + ", mergeThreshold=" + mergeThreshold);
+    }
+    notifyAll();
   }
 
   public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
@@ -399,18 +414,8 @@ public class MergeManager {
 
     commitMemory+= mapOutput.getSize();
 
-    synchronized (inMemoryMerger) {
-      // Can hang if mergeThreshold is really low.
-      // TODO Can avoid spilling in case total input size is between
-      // mergeTghreshold and total available size.
-      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
-        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-            commitMemory + " > mergeThreshold=" + mergeThreshold + 
-            ". Current usedMemory=" + usedMemory);
-        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-        inMemoryMergedMapOutputs.clear();
-        inMemoryMerger.startMerge(inMemoryMapOutputs);
-      } 
+    if (commitMemory >= mergeThreshold) {
+      startMemToDiskMerge();
     }
 
     // This should likely run a Combiner.
@@ -423,7 +428,19 @@ public class MergeManager {
       }
     }
   }
-  
+
+  private void startMemToDiskMerge() {
+    synchronized (inMemoryMerger) {
+      if (!inMemoryMerger.isInProgress()) {
+        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+            commitMemory + " > mergeThreshold=" + mergeThreshold +
+            ". Current usedMemory=" + usedMemory);
+        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+        inMemoryMergedMapOutputs.clear();
+        inMemoryMerger.startMerge(inMemoryMapOutputs);
+      }
+    }
+  }
   
   public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
     inMemoryMergedMapOutputs.add(mapOutput);
@@ -506,8 +523,8 @@ public class MergeManager {
       long mergeOutputSize = 
         createInMemorySegments(inputs, inMemorySegments, 0);
       int noInMemorySegments = inMemorySegments.size();
-      
-      MapOutput mergedMapOutputs = 
+
+      MapOutput mergedMapOutputs =
         unconditionalReserve(dummyMapId, mergeOutputSize, false);
       
       Writer writer = 


Mime
View raw message