tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for tez_shuffle (Kuhu Shukla via jeagles)
Date Thu, 04 May 2017 20:33:30 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 23b43386b -> b81592ad6


TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled for
tez_shuffle (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b81592ad
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b81592ad
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b81592ad

Branch: refs/heads/TEZ-3334
Commit: b81592ad656f70fc18e613b77db9fcc633dda006
Parents: 23b4338
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu May 4 15:33:23 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu May 4 15:33:23 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 10 +++----
 .../tez/dag/app/launcher/DeletionTracker.java   | 12 +-------
 .../dag/app/launcher/DeletionTrackerImpl.java   | 29 ++++++++++++++------
 .../app/launcher/LocalContainerLauncher.java    | 27 ++++++++++++------
 .../app/launcher/TezContainerLauncherImpl.java  | 29 +++++++++++++-------
 .../tez/auxservices/TestShuffleHandlerJobs.java |  2 +-
 7 files changed, 66 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index e7f0211..f407ac5 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled
for tez_shuffle
   TEZ-3685. ShuffleHandler completedInputSet off-by-one error
   TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback
   TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used

http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6c2fcf0..105c85c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -663,19 +663,19 @@ public class TezConfiguration extends Configuration {
   /** Boolean value. Instructs AM to delete Dag directory upon completion */
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty(type="boolean")
-  public static final String TEZ_AM_DAG_DELETE_ENABLED = TEZ_AM_PREFIX
-      + "dag.delete.enabled";
-  public static final boolean TEZ_AM_DAG_DELETE_ENABLED_DEFAULT = false;
+  public static final String TEZ_AM_DAG_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX
+      + "dag.cleanup.on.completion";
+  public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false;
 
   /**
    * Int value. Upper limit on the number of threads used to delete DAG directories on nodes.
    */
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty(type="integer")
-  public static final String TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT =
+  public static final String TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT =
       TEZ_AM_PREFIX + "dag.deletion.thread-count-limit";
 
-  public static final int TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT = 30;
+  public static final int TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT = 10;
 
   /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks
across
    * all vertices. Setting it to the same value for all tasks is helpful for container reuse
and 

http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
index 0409a30..c12f41e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
@@ -18,27 +18,18 @@
 
 package org.apache.tez.dag.app.launcher;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezDAGID;
 
 public abstract class DeletionTracker {
 
   protected final Configuration conf;
-  protected ExecutorService dagDeleteService;
   protected String pluginName;
 
   public DeletionTracker(Configuration conf, String pluginName) {
     this.conf = conf;
-    dagDeleteService = Executors.newFixedThreadPool(
-        conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT,
-            TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder()
-            .setDaemon(true).setNameFormat("ShuffleDeleteTracker #%d").build());
     this.pluginName = pluginName;
   }
 
@@ -51,7 +42,6 @@ public abstract class DeletionTracker {
   }
 
   public void shutdown() {
-    dagDeleteService.shutdownNow();
-    dagDeleteService = null;
+    // do nothing
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
index 625aabb..f0b2818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
@@ -20,7 +20,12 @@ package org.apache.tez.dag.app.launcher;
 
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.records.TezDAGID;
@@ -29,21 +34,21 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 
 public class DeletionTrackerImpl extends DeletionTracker {
-  Map<NodeId, Integer> nodeIdShufflePortMap;
-  String pluginName;
+  private Map<NodeId, Integer> nodeIdShufflePortMap;
+  private ExecutorService dagCleanupService;
 
   public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration
conf, String pluginName) {
     super(conf, pluginName);
     this.nodeIdShufflePortMap = nodeIdShufflePortMap;
+    this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT,
+        TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleDeleteTracker #%d").build());
   }
 
   @Override
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
-    boolean shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED,
-        TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT);
-    if (!shouldDelete) {
-      return;
-    }
+    super.dagComplete(dag, jobTokenSecretManager);
     for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) {
       NodeId nodeId = entry.getKey();
       int shufflePort = entry.getValue();
@@ -51,7 +56,7 @@ public class DeletionTrackerImpl extends DeletionTracker {
       if (shufflePort != TezRuntimeUtils.INVALID_PORT) {
         DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
             shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager,
this.pluginName);
-        dagDeleteService.submit(dagDeleteRunnable);
+        dagCleanupService.submit(dagDeleteRunnable);
       }
     }
     nodeIdShufflePortMap.clear();
@@ -65,4 +70,12 @@ public class DeletionTrackerImpl extends DeletionTracker {
       }
     }
   }
+
+  @Override
+  public void shutdown() {
+    if (dagCleanupService != null) {
+      dagCleanupService.shutdownNow();
+      dagCleanupService = null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index c977c6a..9c04781 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -52,6 +52,7 @@ import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -163,12 +164,17 @@ public class LocalContainerLauncher extends ContainerLauncher {
     String tezDefaultComponentName =
         isLocalMode ? TezConstants.getTezUberServicePluginName() :
         TezConstants.getTezYarnServicePluginName();
-    String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
-        TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
-    deletionTracker = ReflectionUtils.createClazzInstance(
-        deletionTrackerClassName,new Class[] {
-            Map.class, Configuration.class, String.class},
-        new Object[] {new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName});
+    boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf)
+        && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION,
+        TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT);
+    if (cleanupDagDataOnComplete) {
+      String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
+          TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
+      deletionTracker = ReflectionUtils.createClazzInstance(
+          deletionTrackerClassName, new Class[]{
+              Map.class, Configuration.class, String.class},
+          new Object[]{new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName});
+    }
   }
 
   @Override
@@ -272,8 +278,9 @@ public class LocalContainerLauncher extends ContainerLauncher {
       RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId());
       runningContainers.put(event.getContainerId(), callback);
       Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
-
-      deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
+      if (deletionTracker != null) {
+        deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
+      }
     } catch (RejectedExecutionException e) {
       handleLaunchFailed(e, event.getContainerId());
     }
@@ -411,7 +418,9 @@ public class LocalContainerLauncher extends ContainerLauncher {
   }
 
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
-    deletionTracker.dagComplete(dag, jobTokenSecretManager);
+    if (deletionTracker != null) {
+      deletionTracker.dagComplete(dag, jobTokenSecretManager);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index f6a6874..c67fc01 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -89,7 +90,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
   protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
   private ContainerManagementProtocolProxy cmProxy;
   private AtomicBoolean serviceStopped = new AtomicBoolean(false);
-  private DeletionTracker deletionTracker;
+  private DeletionTracker deletionTracker = null;
 
   private Container getContainer(ContainerOp event) {
     ContainerId id = event.getBaseOperation().getContainerId();
@@ -192,8 +193,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
         } else {
           LOG.warn("Shuffle port cannot be found since services metadata response is missing");
         }
-
-        deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
+        if (deletionTracker != null) {
+          deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
+        }
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
             + ExceptionUtils.getStackTrace(t);
@@ -331,12 +333,17 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
     };
     eventHandlingThread.setName("ContainerLauncher Event Handler");
     eventHandlingThread.start();
-    String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
-        TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
-    deletionTracker = ReflectionUtils.createClazzInstance(
-        deletionTrackerClassName,new Class[] {
-          Map.class, Configuration.class, String.class},
-        new Object[] {new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()});
+    boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf)
+        && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION,
+        TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT);
+    if (cleanupDagDataOnComplete) {
+      String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
+          TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
+      deletionTracker = ReflectionUtils.createClazzInstance(
+          deletionTrackerClassName, new Class[]{
+              Map.class, Configuration.class, String.class},
+          new Object[]{new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()});
+    }
   }
 
   @Override
@@ -438,7 +445,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
   }
 
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
-    deletionTracker.dagComplete(dag, jobTokenSecretManager);
+    if (deletionTracker != null) {
+      deletionTracker.dagComplete(dag, jobTokenSecretManager);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b81592ad/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
index 73c4c13..27b700a 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
@@ -118,7 +118,7 @@ public class TestShuffleHandlerJobs {
     TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig());
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     tezConf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, ShuffleHandler.TEZ_SHUFFLE_SERVICEID);
-    tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION, true);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
     tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);


Mime
View raw message