tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2313. Regression in handling obsolete events in ShuffleScheduler (rbalamohan)
Date Tue, 21 Apr 2015 05:41:14 GMT
Repository: tez
Updated Branches:
  refs/heads/master 44046f8a4 -> decb4191b


TEZ-2313. Regression in handling obsolete events in ShuffleScheduler (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/decb4191
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/decb4191
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/decb4191

Branch: refs/heads/master
Commit: decb4191bbada28749483b1b5af837fc87aff8bc
Parents: 44046f8
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Tue Apr 21 11:10:33 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Tue Apr 21 11:10:33 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 .../common/shuffle/orderedgrouped/ShuffleScheduler.java      | 8 +++++---
 .../TestShuffleInputEventHandlerOrderedGrouped.java          | 4 ++++
 3 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/decb4191/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64b3561..bd9ff6d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2313. Regression in handling obsolete events in ShuffleScheduler.
   TEZ-2212. Notify components on DAG completion.
   TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads
   to tez.runtime.pipelined.sorter.sort.threads.

http://git-wip-us.apache.org/repos/asf/tez/blob/decb4191/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index d0b6346..a3d79ae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -349,6 +349,10 @@ class ShuffleScheduler {
             + ", newAttemptNum=" + input.getAttemptNumber()));
         return false;
       }
+
+      if (eventInfo == null) {
+        shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input));
+      }
     }
     return true;
   }
@@ -523,9 +527,6 @@ class ShuffleScheduler {
     if (!validateInputAttemptForPipelinedShuffle(srcAttempt)) {
       return;
     }
-    if (shuffleInfoEventsMap.get(srcAttempt.getInputIdentifier()) == null) {
-      shuffleInfoEventsMap.put(srcAttempt.getInputIdentifier(), new ShuffleEventInfo(srcAttempt));
-    }
 
     host.addKnownMap(srcAttempt);
     pathToIdentifierMap.put(
@@ -542,6 +543,7 @@ class ShuffleScheduler {
     // The incoming srcAttempt does not contain a path component.
     LOG.info("Adding obsolete input: " + srcAttempt);
     if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
+      //Pipelined shuffle case (where shuffleInfoEventsMap gets populated).
       //Fail fast here.
       shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput,
but it "
           + "exists in shuffleInfoEventMap. Some data could have been already merged "

http://git-wip-us.apache.org/repos/asf/tez/blob/decb4191/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 460db01..eed9fd8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -236,6 +236,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
 
     verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri),
eq(id1));
+    assertTrue("Shuffle info events should not be empty for pipelined shuffle",
+        !scheduler.shuffleInfoEventsMap.isEmpty());
 
     //Attempt #0 comes up. When processing this, it should report exception
     attemptNum = 0;
@@ -263,6 +265,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     int partitionId = srcIdx;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
         eq(baseUri), eq(expectedIdentifier));
+    assertTrue("Shuffle info events should be empty for regular shuffle codepath",
+        scheduler.shuffleInfoEventsMap.isEmpty());
   }
 
   @Test(timeout = 5000)


Mime
View raw message