tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3767. Shuffle should not report error to AM during inputContext.killSelf(). (rbalamohan)
Date Wed, 28 Jun 2017 00:54:36 GMT
Repository: tez
Updated Branches:
  refs/heads/master de72fbe62 -> 020a7c873


TEZ-3767. Shuffle should not report error to AM during inputContext.killSelf(). (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/020a7c87
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/020a7c87
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/020a7c87

Branch: refs/heads/master
Commit: 020a7c87381872ea138ecfcada2846b270a4f471
Parents: de72fbe
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Jun 28 06:24:29 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Jun 28 06:24:29 2017 +0530

----------------------------------------------------------------------
 .../orderedgrouped/ExceptionReporter.java       |  1 +
 .../common/shuffle/orderedgrouped/Shuffle.java  |  9 ++++++
 .../orderedgrouped/ShuffleScheduler.java        |  6 +---
 .../shuffle/orderedgrouped/TestShuffle.java     | 30 ++++++++++++++++++++
 4 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
index 8739dd2..1fa99af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
@@ -22,4 +22,5 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
  */
 interface ExceptionReporter {
   void reportException(Throwable t);
+  void killSelf(Exception exception, String message);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index f787c59..0089d8c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -407,6 +407,15 @@ public class Shuffle implements ExceptionReporter {
       cleanupShuffleSchedulerIgnoreErrors();
     }
   }
+
+  @Private
+  @Override
+  public synchronized void killSelf(Exception exception, String message) {
+    if (!isShutDown.get() && throwable.get() == null) {
+      shutdown();
+      inputContext.killSelf(exception, message);
+    }
+  }
   
   public static class ShuffleError extends IOException {
     private static final long serialVersionUID = 5753909320586607881L;

http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index c42ffda..b223c1a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -728,11 +728,7 @@ class ShuffleScheduler {
   @VisibleForTesting
   void killSelf(Exception exception, String message) {
     LOG.error(message, exception);
-    try {
-      this.close();
-    } finally {
-      this.inputContext.killSelf(exception, message);
-    }
+    exceptionReporter.killSelf(exception, message);
   }
 
   private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/tez/blob/020a7c87/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
index cad9523..a28b1fa 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java
@@ -108,6 +108,36 @@ public class TestShuffle {
 
   }
 
+  @Test(timeout = 10000)
+  public void testKillSelf() throws IOException, InterruptedException {
+    InputContext inputContext = createTezInputContext();
+    TezConfiguration conf = new TezConfiguration();
+    conf.setLong(Constants.TEZ_RUNTIME_TASK_MEMORY, 300000l);
+    Shuffle shuffle = new Shuffle(inputContext, conf, 1, 3000000l);
+    try {
+      shuffle.run();
+      ShuffleScheduler scheduler = shuffle.scheduler;
+      assertFalse("scheduler.isShutdown should be false", scheduler.isShutdown());
+
+      // killSelf() would invoke close(). Internally Shuffle --> merge.close() -->
finalMerge()
+      // gets called. In MergeManager::finalMerge(), it would throw illegal argument exception
as
+      // uniqueIdentifier is not present in inputContext. This is used as means of simulating
+      // exception.
+      scheduler.killSelf(new Exception(), "due to internal error");
+      assertTrue("scheduler.isShutdown should be true", scheduler.isShutdown());
+
+      //killSelf() should not result in reporting failure to AM
+      ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+      ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
+      verify(inputContext, times(0)).reportFailure(eq(TaskFailureType.NON_FATAL),
+          throwableArgumentCaptor.capture(),
+          stringArgumentCaptor.capture());
+    } finally {
+      shuffle.shutdown();
+    }
+
+  }
+
 
   private InputContext createTezInputContext() throws IOException {
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);


Mime
View raw message