tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-1827. MiniTezCluster takes 10 minutes to shut down. (hitesh)
Date Thu, 12 Mar 2015 14:50:44 GMT
Repository: tez
Updated Branches:
  refs/heads/master 19df27786 -> 63e985dfc


TEZ-1827. MiniTezCluster takes 10 minutes to shut down. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63e985df
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63e985df
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63e985df

Branch: refs/heads/master
Commit: 63e985dfcf8bda29c3a988e97256d61a6bb9478c
Parents: 19df277
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Mar 12 07:51:08 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Mar 12 07:51:08 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../main/java/org/apache/tez/dag/api/Scope.java |  2 +
 .../apache/tez/dag/api/TezConfiguration.java    | 10 +++
 .../tez/tests/MiniTezClusterWithTimeline.java   | 74 ++++++++++++++++---
 .../org/apache/tez/test/MiniTezCluster.java     | 78 +++++++++++++++++---
 5 files changed, 146 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d29cb5d..207502b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1827. MiniTezCluster takes 10 minutes to shut down.
   TEZ-2178. YARN-3122 breaks tez compilation with hadoop 2.7.0.
   TEZ-2174. Make task priority available to TaskAttemptListener.
   TEZ-2169. Add NDC context to various threads and pools.

http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
index d862e8f..f638e09 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
@@ -25,4 +25,6 @@ public enum Scope {
   AM,       // can only been set at AM level 
   DAG,      // can been set at AM/DAG level
   VERTEX,   // can been set at AM/DAG/VERTEX level
+  CLIENT,   // Client scope - only applicable on client
+  TEST,     // Test scope - only applicable for testing, for example, MiniTezCluster
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8186f2a..c97999f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1164,4 +1164,14 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_DAG_STATUS_POLLINTERVAL_MS = TEZ_PREFIX
       + "dag.status.pollinterval-ms";
   public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500;
+
+  /**
+   * Long value.
+   * Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown.
+   */
+  @ConfigurationScope(Scope.TEST)
+  public static final String TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS =
+      TEZ_PREFIX + "test.minicluster.app.wait.on.shutdown.secs";
+  public static final long TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT = 30;
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
index d48948b..b6e39fa 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
@@ -23,7 +23,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +41,7 @@ import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -57,7 +61,6 @@ import com.google.common.collect.Collections2;
 /**
  * Configures and starts the Tez-specific components in the YARN cluster.
  *
- * When using this mini cluster, the user is expected to
  */
 public class MiniTezClusterWithTimeline extends MiniYARNCluster {
 
@@ -69,6 +72,8 @@ public class MiniTezClusterWithTimeline extends MiniYARNCluster {
 
   private Path confFilePath;
 
+  private long maxTimeToWaitForAppsOnShutdown;
+
   public MiniTezClusterWithTimeline(String testName) {
     this(testName, 1);
   }
@@ -103,7 +108,11 @@ public class MiniTezClusterWithTimeline extends MiniYARNCluster {
       // nothing defined. set quick delete value
       conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
     }
-    
+
+    maxTimeToWaitForAppsOnShutdown = conf.getLong(
+        TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS,
+        TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT);
+
     File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR);
 
     if (!appJarLocalFile.exists()) {
@@ -219,25 +228,72 @@ public class MiniTezClusterWithTimeline extends MiniYARNCluster {
   }
 
   private void waitForAppsToFinish() {
-    YarnClient yarnClient = YarnClient.createYarnClient(); 
+    long waitStartTime = System.currentTimeMillis();
+    long waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ?
+        -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown));
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
     yarnClient.init(getConfig());
     yarnClient.start();
+    Collection<ApplicationReport> unCompletedApps = null;
     try {
-      while(true) {
+      do {
         List<ApplicationReport> appReports = yarnClient.getApplications();
-        Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports,
new Predicate<ApplicationReport>(){
+        unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>()
{
           @Override
           public boolean apply(ApplicationReport appReport) {
             return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
-            YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
-            .contains(appReport.getYarnApplicationState());
+                YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+                .contains(appReport.getYarnApplicationState());
           }
         });
-        if (unCompletedApps.size()==0){
+        if (unCompletedApps.isEmpty()) {
           break;
         }
-        LOG.info("wait for applications to finish in MiniTezClusterWithTimeline");
+        LOG.info("Waiting for applications to finish in MiniTezClusterWithTimeline"
+            + ", incompleteAppsCount=" + unCompletedApps.size());
         Thread.sleep(1000);
+      } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis());
+
+      if (unCompletedApps != null && !unCompletedApps.isEmpty()) {
+        LOG.info("Killing incomplete applications in MiniTezCluster"
+            + ", incompleteAppsCount=" + unCompletedApps.size());
+        Set<ApplicationId> incompleteAppIds =
+            new HashSet<ApplicationId>();
+        for (ApplicationReport appReport : unCompletedApps) {
+          try {
+            LOG.info("Killing application, id=" + appReport.getApplicationId()
+                + ", appName=" + appReport.getName());
+            yarnClient.killApplication(appReport.getApplicationId());
+            incompleteAppIds.add(appReport.getApplicationId());
+          } catch (Exception e) {
+            LOG.warn("Failed to kill app on MiniTezCluster shutdown"
+                + ", appId=" + appReport.getApplicationId()
+                + ", appName=" + appReport.getName());
+          }
+        }
+
+        // Wait for RM to report back that incomplete apps are killed
+        waitStartTime = System.currentTimeMillis();
+        waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ?
+            -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown));
+        do {
+          Iterator<ApplicationId> iter = incompleteAppIds.iterator();
+          while (iter.hasNext()) {
+            ApplicationId applicationId = iter.next();
+            ApplicationReport report = yarnClient.getApplicationReport(applicationId);
+            if (EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED,
+                YarnApplicationState.KILLED).contains(report.getYarnApplicationState()))
{
+              iter.remove();
+              LOG.info("Application completed, id=" + report.getApplicationId()
+                  + ", yarnState=" + report.getYarnApplicationState());
+            }
+          }
+          if (incompleteAppIds.isEmpty()) {
+            break;
+          }
+        } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis());
+
       }
     } catch (Exception e) {
       e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/tez/blob/63e985df/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index 39101eb..1f747b9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -23,7 +23,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +41,7 @@ import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -68,6 +72,8 @@ public class MiniTezCluster extends MiniYARNCluster {
 
   private Path confFilePath;
 
+  private long maxTimeToWaitForAppsOnShutdown;
+
   public MiniTezCluster(String testName) {
     this(testName, 1);
   }
@@ -92,12 +98,16 @@ public class MiniTezCluster extends MiniYARNCluster {
       conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
           "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
     }
-    
+
     if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
       // nothing defined. set quick delete value
       conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
     }
-    
+
+    maxTimeToWaitForAppsOnShutdown = conf.getLong(
+        TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS,
+        TezConfiguration.TEZ_TEST_MINI_CLUSTER_APP_WAIT_ON_SHUTDOWN_SECS_DEFAULT);
+
     File appJarLocalFile = new File(MiniTezCluster.APPJAR);
 
     if (!appJarLocalFile.exists()) {
@@ -108,7 +118,7 @@ public class MiniTezCluster extends MiniYARNCluster {
     } else {
       LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath());
     }
-    
+
     FileSystem fs = FileSystem.get(conf);
     Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
     Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
@@ -213,25 +223,73 @@ public class MiniTezCluster extends MiniYARNCluster {
   }
 
   private void waitForAppsToFinish() {
-    YarnClient yarnClient = YarnClient.createYarnClient(); 
+    long waitStartTime = System.currentTimeMillis();
+    long waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ?
+        -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown));
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
     yarnClient.init(getConfig());
     yarnClient.start();
+    Collection<ApplicationReport> unCompletedApps = null;
     try {
-      while(true) {
+      do {
         List<ApplicationReport> appReports = yarnClient.getApplications();
-        Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports,
new Predicate<ApplicationReport>(){
+        unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>()
{
           @Override
           public boolean apply(ApplicationReport appReport) {
             return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
-            YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
-            .contains(appReport.getYarnApplicationState());
+                YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+                .contains(appReport.getYarnApplicationState());
           }
         });
-        if (unCompletedApps.size()==0){
+        if (unCompletedApps.isEmpty()) {
           break;
         }
-        LOG.info("wait for applications to finish in MiniTezCluster");
+        LOG.info("Waiting for applications to finish in MiniTezCluster"
+            + ", incompleteAppsCount=" + unCompletedApps.size());
         Thread.sleep(1000);
+      } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis());
+
+
+      if (unCompletedApps != null && !unCompletedApps.isEmpty()) {
+        LOG.info("Killing incomplete applications in MiniTezCluster"
+            + ", incompleteAppsCount=" + unCompletedApps.size());
+        Set<ApplicationId> incompleteAppIds =
+            new HashSet<ApplicationId>();
+        for (ApplicationReport appReport : unCompletedApps) {
+          try {
+            LOG.info("Killing application, id=" + appReport.getApplicationId()
+                + ", appName=" + appReport.getName());
+            yarnClient.killApplication(appReport.getApplicationId());
+            incompleteAppIds.add(appReport.getApplicationId());
+          } catch (Exception e) {
+            LOG.warn("Failed to kill app on MiniTezCluster shutdown"
+                + ", appId=" + appReport.getApplicationId()
+                + ", appName=" + appReport.getName());
+          }
+        }
+
+        // Wait for RM to report back that incomplete apps are killed
+        waitStartTime = System.currentTimeMillis();
+        waitEndTime = maxTimeToWaitForAppsOnShutdown == -1 ?
+            -1 : (waitStartTime + (1000 * maxTimeToWaitForAppsOnShutdown));
+        do {
+          Iterator<ApplicationId> iter = incompleteAppIds.iterator();
+          while (iter.hasNext()) {
+            ApplicationId applicationId = iter.next();
+            ApplicationReport report = yarnClient.getApplicationReport(applicationId);
+            if (EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED,
+                YarnApplicationState.KILLED).contains(report.getYarnApplicationState()))
{
+              iter.remove();
+              LOG.info("Application completed, id=" + report.getApplicationId()
+                  + ", yarnState=" + report.getYarnApplicationState());
+            }
+          }
+          if (incompleteAppIds.isEmpty()) {
+            break;
+          }
+        } while (waitEndTime != -1 && waitEndTime > System.currentTimeMillis());
+
       }
     } catch (Exception e) {
       e.printStackTrace();


Mime
View raw message