tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-1216. Clean up the staging directory when the application completes. (hitesh) This closes #3
Date Tue, 12 Aug 2014 21:21:00 GMT
Repository: tez
Updated Branches:
  refs/heads/master 33982e1c5 -> 5cc880c7b


TEZ-1216. Clean up the staging directory when the application completes. (hitesh)
This closes #3


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5cc880c7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5cc880c7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5cc880c7

Branch: refs/heads/master
Commit: 5cc880c7bb0098055830c0473d261bef50f42cd6
Parents: 33982e1
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Aug 12 14:20:08 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Aug 12 14:20:08 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  5 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 57 ++++++++++++++++++--
 .../dag/app/rm/LocalTaskSchedulerService.java   |  6 +++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  3 ++
 .../tez/dag/app/rm/TaskSchedulerService.java    |  5 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    |  8 +++
 6 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5cc880c7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 60d840a..f13219a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -43,6 +43,11 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_PREFIX = TEZ_PREFIX + "am.";
   public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task.";
 
+  /** Whether to delete all the tez framework scratch data in staging dir on completion */
+  public static final String TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE = TEZ_AM_PREFIX +
+      "staging.scratch-data.auto-delete";
+  public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true;
+
   /** The staging dir used while submitting DAGs */
   public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
   public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging";

http://git-wip-us.apache.org/repos/asf/tez/blob/5cc880c7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 135b252..68d2443 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -223,6 +223,7 @@ public class DAGAppMaster extends AbstractService {
 
   private DAGAppMasterShutdownHandler shutdownHandler =
       new DAGAppMasterShutdownHandler();
+  private final AtomicBoolean shutdownHandlerRunning = new AtomicBoolean(false);
 
   private DAGAppMasterState state;
 
@@ -307,6 +308,8 @@ public class DAGAppMaster extends AbstractService {
        appMasterUgi = UserGroupInformation.getCurrentUser();
     }
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+    String strAppId = this.appAttemptID.getApplicationId().toString();
+    this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
 
     dispatcher = createDispatcher();
     context = new RunningAppContext(conf);
@@ -386,8 +389,6 @@ public class DAGAppMaster extends AbstractService {
     this.sessionTimeoutInterval = 1000 * amConf.getInt(
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
-    String strAppId = this.appAttemptID.getApplicationId().toString();
-    this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
     recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
     recoveryFS = recoveryDataDir.getFileSystem(conf);
     currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
@@ -577,11 +578,13 @@ public class DAGAppMaster extends AbstractService {
     }
 
     public void shutdown(boolean now) {
+      LOG.info("DAGAppMasterShutdownHandler invoked");
       if(!shutdownHandled.compareAndSet(false, true)) {
         LOG.info("Ignoring multiple shutdown events");
         return;
       }
 
+      shutdownHandlerRunning.set(true);
       LOG.info("Handling DAGAppMaster shutdown");
 
       AMShutdownRunnable r = new AMShutdownRunnable(now);
@@ -602,6 +605,7 @@ public class DAGAppMaster extends AbstractService {
         // final states. Will be removed once RM come on. TEZ-160.
         if (!immediateShutdown) {
           try {
+            LOG.info("Sleeping for 5 seconds before shutting down");
             Thread.sleep(5000);
           } catch (InterruptedException e) {
             e.printStackTrace();
@@ -614,6 +618,11 @@ public class DAGAppMaster extends AbstractService {
           LOG.info("Calling stop for all the services");
           stop();
 
+          synchronized (shutdownHandlerRunning) {
+            shutdownHandlerRunning.set(false);
+            shutdownHandlerRunning.notify();
+          }
+
           //Bring the process down by force.
           //Not needed after HADOOP-7140
           LOG.info("Exiting DAGAppMaster..GoodBye!");
@@ -1401,6 +1410,7 @@ public class DAGAppMaster extends AbstractService {
       }
       Exception ex = ServiceOperations.stopQuietly(LOG, service);
       if (ex != null && firstException == null) {
+        LOG.warn("Failed to stop service, name=" + service.getName(), ex);
         firstException = ex;
       }
     }
@@ -1531,6 +1541,36 @@ public class DAGAppMaster extends AbstractService {
       this.dagSubmissionTimer.cancel();
     }
     stopServices();
+
+    // Given pre-emption, we should delete tez scratch dir only if unregister is
+    // successful
+    boolean deleteTezScratchData = this.amConf.getBoolean(
+        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
+        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
+          + deleteTezScratchData);
+    }
+    if (deleteTezScratchData && this.taskSchedulerEventHandler != null
+        && this.taskSchedulerEventHandler.hasUnregistered()) {
+      // Delete tez scratch data dir
+      if (this.tezSystemStagingDir != null) {
+        try {
+          FileSystem fs = this.tezSystemStagingDir.getFileSystem(this.amConf);
+          boolean deletedStagingDir = fs.delete(this.tezSystemStagingDir, true);
+          if (!deletedStagingDir) {
+            LOG.warn("Failed to delete tez scratch data dir, path=" + this.tezSystemStagingDir);
+          } else {
+            LOG.info("Completed deletion of tez scratch data dir, path="
+                + this.tezSystemStagingDir);
+          }
+        } catch (IOException e) {
+          // Best effort to delete tez scratch data dir
+          LOG.warn("Failed to delete tez scratch data dir", e);
+        }
+      }
+    }
+
     super.serviceStop();
   }
 
@@ -1688,10 +1728,21 @@ public class DAGAppMaster extends AbstractService {
       this.appMaster = appMaster;
     }
     public void run() {
+      LOG.info("DAGAppMasterShutdownHook invoked");
       if(appMaster.getServiceState() == STATE.STOPPED) {
         if(LOG.isDebugEnabled()) {
           LOG.debug("DAGAppMaster already stopped. Ignoring signal");
         }
+        synchronized (appMaster.shutdownHandlerRunning) {
+          try {
+            if (appMaster.shutdownHandlerRunning.get()) {
+              LOG.info("The shutdown handler is still running, waiting for it to complete");
+              appMaster.shutdownHandlerRunning.wait();
+            }
+          } catch (InterruptedException e) {
+            // Ignore
+          }
+        }
         return;
       }
 
@@ -1712,8 +1763,6 @@ public class DAGAppMaster extends AbstractService {
 
       appMaster.stop();
 
-
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/5cc880c7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 22f8557..490ffeb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -175,6 +175,12 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   public void setShouldUnregister() {
   }
 
+  @Override
+  public boolean hasUnregistered() {
+    // Should always return true as no multiple attempts in local mode
+    return true;
+  }
+
   static class LocalContainerFactory {
     final AppContext appContext;
     AtomicInteger nextId;

http://git-wip-us.apache.org/repos/asf/tez/blob/5cc880c7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index ea845b5..77b6e0e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -533,4 +533,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     }
   }
 
+  public boolean hasUnregistered() {
+    return this.taskScheduler.hasUnregistered();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5cc880c7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 823ce47..096069b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -67,7 +67,9 @@ public abstract class TaskSchedulerService extends AbstractService{
   public abstract Object deallocateContainer(ContainerId containerId);
 
   public abstract void setShouldUnregister();
-  
+
+  public abstract boolean hasUnregistered();
+
   public interface TaskSchedulerAppCallback {
     public class AppFinalStatus {
       public final FinalApplicationStatus exitStatus;
@@ -102,5 +104,6 @@ public abstract class TaskSchedulerService extends AbstractService{
     public float getProgress();
     public void preemptContainer(ContainerId containerId);
     public AppFinalStatus getFinalAppStatus();
+
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5cc880c7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 78de101..5bd476c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -128,6 +128,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   final int appHostPort;
   final String appTrackingUrl;
   final AppContext appContext;
+  private AtomicBoolean hasUnregistered = new AtomicBoolean(false);
 
   AtomicBoolean isStopped = new AtomicBoolean(false);
 
@@ -273,6 +274,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     this.shouldUnregister.set(true);
   }
 
+  @Override
+  public boolean hasUnregistered() {
+    return hasUnregistered.get();
+  }
+
   // AbstractService methods
   @Override
   public synchronized void serviceInit(Configuration conf) {
@@ -379,6 +385,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
           amRmClient.unregisterApplicationMaster(status.exitStatus,
               status.exitMessage,
               status.postCompletionTrackingUrl);
+          LOG.info("Successfully unregistered application from RM");
+          hasUnregistered.set(true);
         }
       }
 


Mime
View raw message