tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. (sseth)
Date Fri, 11 Nov 2016 18:11:40 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 aff0edb87 -> b5650a492


TEZ-3534. Differentiate thread names on Fetchers, minor changes to
shuffle shutdown code. (sseth)

(cherry picked from commit 0d598442640ff3cb6ca52f2077bfddb62b54e628)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b5650a49
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b5650a49
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b5650a49

Branch: refs/heads/branch-0.8
Commit: b5650a492de1330954367872489f7aadc980c93d
Parents: aff0edb
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Nov 11 10:11:26 2016 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Nov 11 10:12:55 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/shuffle/impl/ShuffleManager.java     | 10 ++++--
 .../common/shuffle/orderedgrouped/Shuffle.java  |  8 ++++-
 .../orderedgrouped/ShuffleScheduler.java        | 33 ++++++++++++++++----
 .../orderedgrouped/TestShuffleScheduler.java    |  4 ---
 5 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a21554b..7023d0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
   TEZ-3491. Tez job can hang due to container priority inversion.
   TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
   TEZ-3493. DAG submit timeout cannot be set to a month

http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 0cb17e6..84c0779 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -230,7 +230,7 @@ public class ShuffleManager implements FetcherCallback {
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
         numFetchers,
         new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
+            .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
     
     ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
@@ -799,7 +799,13 @@ public class ShuffleManager implements FetcherCallback {
       try {
         wakeLoop.signal(); // signal the fetch-scheduler
         for (Fetcher fetcher : runningFetchers) {
-          fetcher.shutdown(); // This could be parallelized.
+          try {
+            fetcher.shutdown(); // This could be parallelized.
+          } catch (Exception e) {
+            LOG.warn(
+                "Error while stopping fetcher during shutdown. Ignoring and continuing. Message={}",
+                e.getMessage());
+          }
         }
       } finally {
         lock.unlock();

http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 5a18959..e5f4e5c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -378,7 +378,13 @@ public class Shuffle implements ExceptionReporter {
       if (eventHandler != null) {
         eventHandler.logProgress(true);
       }
-      cleanupShuffleSchedulerIgnoreErrors();
+      try {
+        cleanupShuffleSchedulerIgnoreErrors();
+      } catch (Exception e) {
+        LOG.warn(
+            "Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown.
Message={}",
+            e.getMessage());
+      }
       cleanupMerger(true);
     } catch (Throwable t) {
       LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t);

http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 61dc456..84da654 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -376,7 +376,7 @@ class ShuffleScheduler {
 
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
         new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher {" + srcNameTrimmed + "} #%d").build());
+            .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
 
     this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
@@ -429,10 +429,15 @@ class ShuffleScheduler {
     schedulerCallable.call();
   }
 
-  public void close() throws InterruptedException {
+  public void close() {
     try {
       if (!isShutdown.getAndSet(true)) {
-        logProgress();
+        try {
+          logProgress();
+        } catch (Exception e) {
+          LOG.warn("Failed log progress while closing, ignoring and continuing shutdown.
Message={}",
+              e.getMessage());
+        }
 
         // Notify and interrupt the waiting scheduler thread
         synchronized (this) {
@@ -450,12 +455,28 @@ class ShuffleScheduler {
 
         // Interrupt the fetchers.
         for (FetcherOrderedGrouped fetcher : runningFetchers) {
-          fetcher.shutDown();
+          try {
+            fetcher.shutDown();
+          } catch (Exception e) {
+            LOG.warn(
+                "Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}",
+                e.getMessage());
+          }
         }
 
         // Kill the Referee thread.
-        referee.interrupt();
-        referee.join();
+        try {
+          referee.interrupt();
+          referee.join();
+        } catch (InterruptedException e) {
+          LOG.warn(
+              "Interrupted while shutting down referee. Ignoring and continuing shutdown");
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.warn(
+              "Error while shutting down referee. Ignoring and continuing shutdown. Message={}",
+              e.getMessage());
+        }
       }
     } finally {
       long startTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/tez/blob/b5650a49/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 5b6c59f..31da4d0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -881,11 +881,7 @@ public class TestShuffleScheduler {
       // Close the scheduler on different thread to trigger interrupt
       Thread thread = new Thread(new Runnable() {
         @Override public void run() {
-          try {
             scheduler.close();
-          } catch (InterruptedException e) {
-            //ignore
-          }
         }
       });
       thread.start();


Mime
View raw message