tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2186. OOM with a simple scatter gather job with re-use (rbalamohan)
Date Sun, 22 Mar 2015 23:40:56 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2703f0b08 -> cbd7323cd


TEZ-2186. OOM with a simple scatter gather job with re-use (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cbd7323c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cbd7323c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cbd7323c

Branch: refs/heads/master
Commit: cbd7323cd9027e2f1459ba477c7f01c99306ba31
Parents: 2703f0b
Author: Rajesh Balamohan <rbalamohan@hortonworks.com>
Authored: Mon Mar 23 05:11:04 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@hortonworks.com>
Committed: Mon Mar 23 05:11:04 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../common/shuffle/orderedgrouped/Shuffle.java  | 52 +++++++++++---------
 2 files changed, 31 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cbd7323c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e11008..4562d9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2186. OOM with a simple scatter gather job with re-use
   TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt
   TEZ-2210. Record DAG AM CPU usage stats
   TEZ-2203. Intern strings in tez counters

http://git-wip-us.apache.org/repos/asf/tez/blob/cbd7323c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index f7b45a7..f9f4ec2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -20,7 +20,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -331,14 +330,18 @@ public class Shuffle implements ExceptionReporter {
     protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException
{
 
       synchronized (this) {
-        for (int i = 0; i < numFetchers; ++i) {
-          FetcherOrderedGrouped
-              fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
-            metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
-            codec, inputContext, conf, localDiskFetchEnabled,
-              inputContext.getExecutionContext().getHostName());
-          fetchers.add(fetcher);
-          fetcher.start();
+        synchronized (fetchers) {
+          for (int i = 0; i < numFetchers; ++i) {
+            if (!isShutDown.get()) {
+              FetcherOrderedGrouped
+                fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
+                metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength,
+                codec, inputContext, conf, localDiskFetchEnabled,
+                inputContext.getExecutionContext().getHostName());
+              fetchers.add(fetcher);
+              fetcher.start();
+            }
+          }
         }
       }
 
@@ -386,23 +389,28 @@ public class Shuffle implements ExceptionReporter {
     // Stop the fetcher threads
     InterruptedException ie = null;
     if (!fetchersClosed.getAndSet(true)) {
-      for (FetcherOrderedGrouped fetcher : fetchers) {
-        try {
-          fetcher.shutDown();
-        } catch (InterruptedException e) {
-          if (ignoreErrors) {
-            LOG.info("Interrupted while shutting down fetchers. Ignoring.");
-          } else {
-            if (ie != null) {
-              ie = e;
+      synchronized (fetchers) {
+        for (FetcherOrderedGrouped fetcher : fetchers) {
+          try {
+            fetcher.shutDown();
+            LOG.info("Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() +
", "
+                + "isInterrupted:" + fetcher.isInterrupted());
+          } catch (InterruptedException e) {
+            if (ignoreErrors) {
+              LOG.info("Interrupted while shutting down fetchers. Ignoring.");
             } else {
-              LOG.warn("Ignoring exception while shutting down fetcher since a previous one
was seen and will be thrown "
-                  + e);
+              if (ie != null) {
+                ie = e;
+              } else {
+                LOG.warn(
+                    "Ignoring exception while shutting down fetcher since a previous one
was seen and will be thrown "
+                        + e);
+              }
             }
           }
         }
+        fetchers.clear();
       }
-      fetchers.clear();
       // throw only the first exception while attempting to shutdown.
       if (ie != null) {
         throw ie;
@@ -445,7 +453,7 @@ public class Shuffle implements ExceptionReporter {
       cleanupShuffleScheduler(true);
       cleanupMerger(true);
     } catch (Throwable t) {
-      // Ignore
+      LOG.info("Error in cleaning up.., ", t);
     }
   }
 


Mime
View raw message