tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3533. ShuffleScheduler should shutdown threadpool on exit (rbalamohan)
Date Wed, 09 Nov 2016 02:14:44 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 d9a698e49 -> 5280491d4


TEZ-3533. ShuffleScheduler should shutdown threadpool on exit (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5280491d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5280491d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5280491d

Branch: refs/heads/branch-0.8
Commit: 5280491d4ce84806305bfbe53309c0aa817cf3d7
Parents: d9a698e
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Nov 9 07:44:24 2016 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Nov 9 07:44:24 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/shuffle/orderedgrouped/Shuffle.java  |  2 +
 .../orderedgrouped/ShuffleScheduler.java        | 59 +++++++++++------
 .../orderedgrouped/TestShuffleScheduler.java    | 67 ++++++++++++++++++++
 4 files changed, 108 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5280491d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 884b1cf..db48f91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3533. ShuffleScheduler should shutdown threadpool on exit.
   TEZ-3493. DAG submit timeout cannot be set to a month
   TEZ-3505. Move license to the file header for TezBytesWritableSerialization
   TEZ-3486. COMBINE_OUTPUT_RECORDS/COMBINE_INPUT_RECORDS are not correct

http://git-wip-us.apache.org/repos/asf/tez/blob/5280491d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 37269ad..5a18959 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -291,6 +291,8 @@ public class Shuffle implements ExceptionReporter {
           scheduler.start();
         } catch (Throwable e) {
           throw new ShuffleError("Error during shuffle", e);
+        } finally {
+          cleanupShuffleScheduler();
         }
       }
       // The ShuffleScheduler may have exited cleanly as a result of a shutdown invocation

http://git-wip-us.apache.org/repos/asf/tez/blob/5280491d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index c017efb..61dc456 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -430,35 +430,52 @@ class ShuffleScheduler {
   }
 
   public void close() throws InterruptedException {
-    if (!isShutdown.getAndSet(true)) {
+    try {
+      if (!isShutdown.getAndSet(true)) {
+        logProgress();
 
-      logProgress();
+        // Notify and interrupt the waiting scheduler thread
+        synchronized (this) {
+          notifyAll();
+        }
+        // Interrupt the ShuffleScheduler thread only if the close is invoked by another
thread.
+        // If this is invoked on the same thread, then the shuffleRunner has already complete,
and there's
+        // no point interrupting it.
+        // The interrupt is needed to unblock any merges or waits which may be happening,
so that the thread can
+        // exit.
+        if (shuffleSchedulerThread != null && !Thread.currentThread()
+            .equals(shuffleSchedulerThread)) {
+          shuffleSchedulerThread.interrupt();
+        }
 
-      // Notify and interrupt the waiting scheduler thread
-      synchronized (this) {
-        notifyAll();
-      }
-      // Interrupt the ShuffleScheduler thread only if the close is invoked by another thread.
-      // If this is invoked on the same thread, then the shuffleRunner has already complete,
and there's
-      // no point interrupting it.
-      // The interrupt is needed to unblock any merges or waits which may be happening, so
that the thread can
-      // exit.
-      if (shuffleSchedulerThread != null && !Thread.currentThread().equals(shuffleSchedulerThread))
{
-        shuffleSchedulerThread.interrupt();
-      }
+        // Interrupt the fetchers.
+        for (FetcherOrderedGrouped fetcher : runningFetchers) {
+          fetcher.shutDown();
+        }
 
-      // Interrupt the fetchers.
-      for (FetcherOrderedGrouped fetcher : runningFetchers) {
-        fetcher.shutDown();
+        // Kill the Referee thread.
+        referee.interrupt();
+        referee.join();
       }
-
-      // Kill the Referee thread.
-      referee.interrupt();
-      referee.join();
+    } finally {
+      long startTime = System.currentTimeMillis();
+      if (!fetcherExecutor.isShutdown()) {
+        // Ensure that fetchers respond to cancel request.
+        fetcherExecutor.shutdownNow();
+      }
+      long endTime = System.currentTimeMillis();
+      LOG.info("Shutting down fetchers for input: {}, shutdown timetaken: {} ms, "
+              + "hasFetcherExecutorStopped: {}", srcNameTrimmed,
+          (endTime - startTime), hasFetcherExecutorStopped());
     }
   }
 
   @VisibleForTesting
+  boolean hasFetcherExecutorStopped() {
+    return fetcherExecutor.isShutdown();
+  }
+
+  @VisibleForTesting
   public boolean isShutdown() {
     return isShutdown.get();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/5280491d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 15cfa48..5b6c59f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -830,6 +830,73 @@ public class TestShuffleScheduler {
     }
   }
 
+  @Test(timeout = 30000)
+  public void testShutdownWithInterrupt() throws Exception {
+    InputContext inputContext = createTezInputContext();
+    Configuration conf = new TezConfiguration();
+    int numInputs = 10;
+    Shuffle shuffle = mock(Shuffle.class);
+    MergeManager mergeManager = mock(MergeManager.class);
+
+    final ShuffleSchedulerForTest scheduler =
+        new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+            mergeManager,
+            System.currentTimeMillis(), null, false, 0, "srcName");
+
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+
+    Future<Void> executorFuture = executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        scheduler.start();
+        return null;
+      }
+    });
+
+    InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
+
+    for (int i = 0; i < numInputs; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(i, 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
+      identifiers[i] = inputAttemptIdentifier;
+    }
+
+    MapHost[] mapHosts = new MapHost[numInputs];
+    int count = 0;
+    for (MapHost mh : scheduler.mapLocations.values()) {
+      mapHosts[count++] = mh;
+    }
+
+    // Copy succeeded for 1 less host
+    for (int i = 0; i < numInputs - 1; i++) {
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
+      scheduler.freeHost(mapHosts[i]);
+    }
+
+    try {
+      // Close the scheduler on different thread to trigger interrupt
+      Thread thread = new Thread(new Runnable() {
+        @Override public void run() {
+          try {
+            scheduler.close();
+          } catch (InterruptedException e) {
+            //ignore
+          }
+        }
+      });
+      thread.start();
+      thread.join();
+    } finally {
+      assertTrue("Fetcher executor should be shutdown, but still running",
+          scheduler.hasFetcherExecutorStopped());
+      executor.shutdownNow();
+    }
+  }
+
 
   private InputContext createTezInputContext() throws IOException {
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);


Mime
View raw message