tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3509. Make DAG Deletion path based (Kuhu Shukla via jeagles)
Date Fri, 18 Nov 2016 17:35:05 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 85e77b9cc -> bee148439


TEZ-3509. Make DAG Deletion path based (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bee14843
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bee14843
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bee14843

Branch: refs/heads/TEZ-3334
Commit: bee1484394e16b10e55269526e99a54748d838e0
Parents: 85e77b9
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Fri Nov 18 11:34:46 2016 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Fri Nov 18 11:34:46 2016 -0600

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  12 +++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   2 +-
 .../app/launcher/ContainerLauncherManager.java  |   7 +-
 .../app/launcher/ContainerLauncherWrapper.java  |   4 +-
 .../tez/dag/app/launcher/DagDeleteRunnable.java |  20 ++--
 .../app/launcher/LocalContainerLauncher.java    |  58 ++++------
 .../app/launcher/TezContainerLauncherImpl.java  |  56 ++++------
 .../apache/tez/auxservices/ShuffleHandler.java  |   8 +-
 .../tez/auxservices/TestShuffleHandler.java     |   2 +-
 .../runtime/library/common/TezRuntimeUtils.java | 106 ++++++++++++++++++-
 .../library/common/shuffle/ShuffleUtils.java    |  91 +---------------
 11 files changed, 185 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ce344bf..22cd80e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1655,6 +1655,18 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT = "org.apache.tez.dag.history.recovery.RecoveryService";
 
   /**
+   * String value that is a class name.
+   * Specify the class to use for Deletion tracking.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty
+  public static final String TEZ_DELETION_TRACKER_CLASS =
+      TEZ_PREFIX + "history.logging.service.class";
+
+  public static final String TEZ_DELETION_TRACKER_CLASS_DEFAULT =
+      "org.apache.tez.dag.app.launcher.DeletionTrackerImpl";
+
+  /**
    * Boolean value. Default false.
    * By default, configured values for the Summary Entity Types for Timeline will
    * not be respected and be overridden by the Timeline History Service.

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 605e6f5..646eefb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -842,7 +842,7 @@ public class DAGAppMaster extends AbstractService {
       DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
       LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id="
+
           cleanupEvent.getDag().getID());
-      containerLauncherManager.dagComplete(cleanupEvent.getDag(), jobTokenSecretManager);
+      containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), jobTokenSecretManager);
       taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
       nodes.dagComplete(cleanupEvent.getDag());
       containers.dagComplete(cleanupEvent.getDag());

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 3bbb602..e3f96ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -42,7 +43,6 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerLauncherContextImpl;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
-import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
 import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
 import org.apache.tez.serviceplugins.api.DagInfo;
@@ -146,7 +146,7 @@ public class ContainerLauncherManager extends AbstractService
                                                 AppContext context,
                                                 TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                                 String workingDirectory,
-                                                boolean isLocalMode) {
+                                                boolean isLocalMode) throws TezException
{
     LOG.info("Creating LocalContainerLauncher");
     // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
     // extensive internals which are only available at runtime. Will likely require
@@ -194,8 +194,7 @@ public class ContainerLauncherManager extends AbstractService
     }
   }
 
-  public void dagComplete(DAG dag, JobTokenSecretManager secretManager) {
-
+  public void dagComplete(TezDAGID dag, JobTokenSecretManager secretManager) {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
       containerLaunchers[i].dagComplete(dag, secretManager);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
index 5f5f66e..c70ab10 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
@@ -15,7 +15,7 @@
 package org.apache.tez.dag.app.launcher;
 
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
@@ -40,7 +40,7 @@ public class ContainerLauncherWrapper {
     return real;
   }
 
-  public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) {
+  public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
     if (real instanceof TezContainerLauncherImpl) {
       ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
index fefaf69..669d539 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
@@ -18,26 +18,30 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.http.BaseHttpConnection;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 
 import java.net.URL;
 
 class DagDeleteRunnable implements Runnable {
   final NodeId nodeId;
-  final DAG dag;
+  final TezDAGID dag;
   final JobTokenSecretManager jobTokenSecretManager;
   final String tezDefaultComponentName;
   final int shufflePort;
+  final Configuration conf;
 
-  public DagDeleteRunnable(NodeId nodeId, int shufflePort, DAG currentDag,
+  public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag,
+                           Configuration conf,
                            JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent)
{
     this.nodeId = nodeId;
     this.shufflePort = shufflePort;
     this.dag = currentDag;
+    this.conf = conf;
     this.jobTokenSecretManager = jobTokenSecretMgr;
     this.tezDefaultComponentName = tezDefaultComponent;
   }
@@ -45,11 +49,11 @@ class DagDeleteRunnable implements Runnable {
   @Override
   public void run() {
     try {
-      URL baseURL = ShuffleUtils.constructBaseURIForShuffleHandlerDagComplete(
+      URL baseURL = TezRuntimeUtils.constructBaseURIForShuffleHandlerDagComplete(
           nodeId.getHost(), shufflePort,
-          dag.getID().getApplicationId().toString(), dag.getID().getId(), false);
-      BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection(true, baseURL,
-          ShuffleUtils.getHttpConnectionParams(dag.getConf()), "DAGDelete", jobTokenSecretManager);
+          dag.getApplicationId().toString(), dag.getId(), false);
+      BaseHttpConnection httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL,
+          TezRuntimeUtils.getHttpConnectionParams(conf), "DAGDelete", jobTokenSecretManager);
       httpConnection.connect();
       httpConnection.getInputStream();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index eb9b459..b6f725c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -45,11 +45,13 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -72,7 +74,6 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.task.TezChild;
 
 
@@ -93,9 +94,8 @@ public class LocalContainerLauncher extends ContainerLauncher {
   private final ExecutionContext executionContext;
   private final int numExecutors;
   private final boolean isLocalMode;
-  int shufflePort = ShuffleUtils.UNDEFINED_PORT;
-  private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId,
Integer>();
-  private ExecutorService dagDeleteService;
+  int shufflePort = TezRuntimeUtils.INVALID_PORT;
+  private DeletionTracker deletionTracker;
   boolean shouldDelete;
 
   private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
@@ -116,7 +116,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
                                 AppContext context,
                                 TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                 String workingDirectory,
-                                boolean isLocalMode) throws UnknownHostException {
+                                boolean isLocalMode) throws UnknownHostException, TezException
{
     // TODO Post TEZ-2003. Most of this information is dynamic and only available after the
AM
     // starts up. It's not possible to set these up via a static payload.
     // Will need some kind of mechanism to dynamically crate payloads / bind to parameters
@@ -146,7 +146,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
       AuxiliaryServiceHelper.setServiceDataIntoEnv(
           auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv);
       try {
-        shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(
+        shufflePort = TezRuntimeUtils.deserializeShuffleProviderMetaData(
             AuxiliaryServiceHelper.getServiceDataFromEnv(auxiliaryService, localEnv));
       } catch (IOException e) {
         LOG.warn("Could not extract shuffle aux-service port!");
@@ -161,12 +161,17 @@ public class LocalContainerLauncher extends ContainerLauncher {
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread
#%d")
             .build());
     this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor);
-    dagDeleteService = Executors.newFixedThreadPool(
-        conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT,
-            TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder()
-            .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build());
     shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED,
         TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT);
+    String tezDefaultComponentName =
+        isLocalMode ? TezConstants.getTezUberServicePluginName() :
+        TezConstants.getTezYarnServicePluginName();
+    String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_DELETION_TRACKER_CLASS,
+        TezConfiguration.TEZ_DELETION_TRACKER_CLASS_DEFAULT);
+    deletionTracker = ReflectionUtils.createClazzInstance(
+        deletionTrackerClassName,new Class[] {
+            Map.class, Configuration.class, String.class},
+        new Object[] {new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName});
   }
 
   @Override
@@ -190,9 +195,8 @@ public class LocalContainerLauncher extends ContainerLauncher {
       taskExecutorService.shutdownNow();
     }
     callbackExecutor.shutdownNow();
-    if (dagDeleteService != null) {
-      dagDeleteService.shutdown();
-      dagDeleteService = null;
+    if (deletionTracker != null) {
+      deletionTracker.shutdown();
     }
   }
 
@@ -272,11 +276,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
       runningContainers.put(event.getContainerId(), callback);
       Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
 
-      if (isLocalMode && shufflePort != ShuffleUtils.UNDEFINED_PORT) {
-        if(nodeIdShufflePortMap.get(event.getNodeId()) == null) {
-          nodeIdShufflePortMap.put(event.getNodeId(), shufflePort);
-        }
-      }
+      deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
     } catch (RejectedExecutionException e) {
       handleLaunchFailed(e, event.getContainerId());
     }
@@ -413,24 +413,8 @@ public class LocalContainerLauncher extends ContainerLauncher {
     }
   }
 
-  public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) {
-    if (!shouldDelete) {
-      return;
-    }
-    String tezDefaultComponentName =
-        isLocalMode ? TezConstants.getTezUberServicePluginName() :
-        TezConstants.getTezYarnServicePluginName();
-    for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) {
-      NodeId nodeId = entry.getKey();
-      int shufflePort = entry.getValue();
-      //TODO: add check for healthy node
-      if (shufflePort != ShuffleUtils.UNDEFINED_PORT) {
-        DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
-            shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName);
-        dagDeleteService.submit(dagDeleteRunnable);
-      }
-    }
-    nodeIdShufflePortMap.clear();
+  public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
+    deletionTracker.dagComplete(dag, jobTokenSecretManager);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index 0726d86..3ad3488 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
@@ -37,11 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -89,8 +89,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
   protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
   private ContainerManagementProtocolProxy cmProxy;
   private AtomicBoolean serviceStopped = new AtomicBoolean(false);
-  private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId,
Integer>();
-  private ExecutorService dagDeleteService;
+  private DeletionTracker deletionTracker;
 
   private Container getContainer(ContainerOp event) {
     ContainerId id = event.getBaseOperation().getContainerId();
@@ -177,7 +176,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
         getContext().containerLaunched(containerID);
         this.state = ContainerState.RUNNING;
 
-        int shufflePort  = ShuffleUtils.UNDEFINED_PORT;
+        int shufflePort = TezRuntimeUtils.INVALID_PORT;
         ByteBuffer portInfo =
             response.getAllServicesMetaData().get(
                 conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
@@ -188,11 +187,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
           shufflePort = in.readInt();
         }
 
-        if (shufflePort != ShuffleUtils.UNDEFINED_PORT) {
-          if(nodeIdShufflePortMap.get(event.getNodeId()) == null) {
-            nodeIdShufflePortMap.put(event.getNodeId(), shufflePort);
-          }
-        }
+        deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
             + ExceptionUtils.getStackTrace(t);
@@ -269,7 +264,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
   }
 
   @Override
-  public void start() {
+  public void start() throws TezException {
     // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
     cmProxy =
         new ContainerManagementProtocolProxy(conf);
@@ -330,10 +325,12 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
     };
     eventHandlingThread.setName("ContainerLauncher Event Handler");
     eventHandlingThread.start();
-    dagDeleteService = Executors.newFixedThreadPool(
-        conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT,
-            TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder()
-            .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build());
+    String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_DELETION_TRACKER_CLASS,
+        TezConfiguration.TEZ_DELETION_TRACKER_CLASS_DEFAULT);
+    deletionTracker = ReflectionUtils.createClazzInstance(
+        deletionTrackerClassName,new Class[] {
+          Map.class, Configuration.class, String.class},
+        new Object[] {new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()});
   }
 
   @Override
@@ -348,9 +345,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
     if (launcherPool != null) {
       launcherPool.shutdownNow();
     }
-    if (dagDeleteService != null) {
-      dagDeleteService.shutdown();
-      dagDeleteService = null;
+    if (deletionTracker != null) {
+      deletionTracker.shutdown();
     }
   }
 
@@ -435,24 +431,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
     }
   }
 
-  public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) {
-    boolean shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED,
-        TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT);
-    if (!shouldDelete) {
-      return;
-    }
-    String tezDefaultComponentName = TezConstants.getTezYarnServicePluginName();
-      for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) {
-        NodeId nodeId = entry.getKey();
-        int shufflePort = entry.getValue();
-        //TODO: add check for healthy node
-        if (shufflePort != ShuffleUtils.UNDEFINED_PORT) {
-          DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
-              shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName);
-          dagDeleteService.submit(dagDeleteRunnable);
-        }
-      }
-      nodeIdShufflePortMap.clear();
+  public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
+    deletionTracker.dagComplete(dag, jobTokenSecretManager);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 2991a55..fdaba86 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -905,7 +905,7 @@ public class ShuffleHandler extends AuxiliaryService {
       final Map<String,List<String>> q =
         new QueryStringDecoder(request.getUri()).getParameters();
       final List<String> keepAliveList = q.get("keepAlive");
-      final List<String> dagCompletedQ = q.get("dagCompleted");
+      final List<String> dagCompletedQ = q.get("dagAction");
       boolean keepAliveParam = false;
       if (keepAliveList != null && keepAliveList.size() == 1) {
         keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
@@ -1006,7 +1006,11 @@ public class ShuffleHandler extends AuxiliaryService {
     private boolean deleteDagDirectories(MessageEvent evt,
                                          List<String> dagCompletedQ, List<String>
jobQ,
                                          List<String> dagIdQ) {
-      if (dagCompletedQ != null && !dagCompletedQ.isEmpty()) {
+      if (jobQ == null || jobQ.isEmpty()) {
+        return false;
+      }
+      if (dagCompletedQ != null && !dagCompletedQ.isEmpty() && dagCompletedQ.get(0).contains("delete")
+          && dagIdQ != null && !dagIdQ.isEmpty()) {
         String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0)));
         try {
           LocalFileSystem lfs = FileSystem.getLocal(conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 3d622e6..ebd9c5d 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -1115,7 +1115,7 @@ public class TestShuffleHandler {
               "http://127.0.0.1:"
                   + shuffleHandler.getConfig().get(
                   ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-                  + "/mapOutput?job=job_12345_0001&dag=1&dagCompleted=true");
+                  + "/mapOutput?dagAction=delete&job=job_12345_0001&dag=1");
       HttpURLConnection conn = (HttpURLConnection) url.openConnection();
       conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index c0b7210..d39d554 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -21,7 +21,17 @@ package org.apache.tez.runtime.library.common;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.ByteBuffer;
 
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.http.async.netty.AsyncHttpConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,7 +50,11 @@ public class TezRuntimeUtils {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(TezRuntimeUtils.class);
-  
+  //Shared by multiple threads
+  private static volatile SSLFactory sslFactory;
+  //ShufflePort by default for ContainerLaunchers
+  public static final int INVALID_PORT = -1;
+
   public static String getTaskIdentifier(String vertexName, int taskIndex) {
     return String.format("%s_%06d", vertexName, taskIndex);
   }
@@ -159,4 +173,94 @@ public class TezRuntimeUtils {
                   TezTaskOutputFiles.class.getName()), e);
     }
   }
+
+  public static URL constructBaseURIForShuffleHandlerDagComplete(
+      String host, int port, String appId, int dagIdentifier, boolean sslShuffle)
+      throws MalformedURLException {
+    final String http_protocol = (sslShuffle) ? "https://" : "http://";
+    StringBuilder sb = new StringBuilder(http_protocol);
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append("/");
+    sb.append("mapOutput?dagAction=delete");
+    sb.append("&job=");
+    sb.append(appId.replace("application", "job"));
+    sb.append("&dag=");
+    sb.append(String.valueOf(dagIdentifier));
+    return new URL(sb.toString());
+  }
+
+  public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
+    int connectionTimeout =
+        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
+
+    int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
+
+    int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
+
+    boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
+
+    int keepAliveMaxConnections = conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
+
+    if (keepAlive) {
+      System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
+      System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections));
+    }
+
+    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+    if (sslShuffle) {
+      if (sslFactory == null) {
+        synchronized (HttpConnectionParams.class) {
+          //Create sslFactory if it is null or if it was destroyed earlier
+          if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() ==
null) {
+            sslFactory =
+                new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf);
+            try {
+              sslFactory.init();
+            } catch (Exception ex) {
+              sslFactory.destroy();
+              sslFactory = null;
+              throw new RuntimeException(ex);
+            }
+          }
+        }
+      }
+    }
+
+    HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive,
+        keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle,
+        sslFactory);
+    return httpConnParams;
+  }
+
+  public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
+                                                     HttpConnectionParams params, String
logIdentifier, JobTokenSecretManager jobTokenSecretManager)
+      throws IOException {
+    if (asyncHttp) {
+      //TODO: support other async packages? httpclient-async?
+      return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager);
+    } else {
+      return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager);
+    }
+  }
+
+  public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    try {
+      in.reset(meta);
+      int port = in.readInt();
+      return port;
+    } finally {
+      in.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bee14843/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index ed2e26e..64a10d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -41,11 +41,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.BaseHttpConnection;
-import org.apache.tez.http.HttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
-import org.apache.tez.http.SSLFactory;
-import org.apache.tez.http.async.netty.AsyncHttpConnection;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -62,7 +60,6 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -76,10 +73,6 @@ public class ShuffleUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
   private static final long MB = 1024l * 1024l;
 
-  public static final int UNDEFINED_PORT = -1;
-  //Shared by multiple threads
-  private static volatile SSLFactory sslFactory;
-
   static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
       new ThreadLocal<DecimalFormat>() {
         @Override
@@ -105,14 +98,7 @@ public class ShuffleUtils {
 
   public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
       throws IOException {
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    try {
-      in.reset(meta);
-      int port = in.readInt();
-      return port;
-    } finally {
-      in.close();
-    }
+    return TezRuntimeUtils.deserializeShuffleProviderMetaData(meta);
   }
 
   public static void shuffleToMemory(byte[] shuffleData,
@@ -223,23 +209,6 @@ public class ShuffleUtils {
     return sb;
   }
 
-  public static URL constructBaseURIForShuffleHandlerDagComplete(
-      String host, int port, String appId, int dagIdentifier, boolean sslShuffle)
-      throws MalformedURLException{
-    final String http_protocol = (sslShuffle) ? "https://" : "http://";
-    StringBuilder sb = new StringBuilder(http_protocol);
-    sb.append(host);
-    sb.append(":");
-    sb.append(port);
-    sb.append("/");
-    sb.append("mapOutput?job=");
-    sb.append(appId.replace("application", "job"));
-    sb.append("&dag=");
-    sb.append(String.valueOf(dagIdentifier));
-    sb.append("&dagCompleted=true");
-    return new URL(sb.toString());
-  }
-
   public static URL constructInputURL(String baseURI,
       Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException
{
     StringBuilder url = new StringBuilder(baseURI);
@@ -263,12 +232,7 @@ public class ShuffleUtils {
   public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
       HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager)
       throws IOException {
-    if (asyncHttp) {
-      //TODO: support other async packages? httpclient-async?
-      return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager);
-    } else {
-      return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager);
-    }
+    return TezRuntimeUtils.getHttpConnection(asyncHttp, url, params, logIdentifier, jobTokenSecretManager);
   }
 
   public static String stringify(DataMovementEventPayloadProto dmProto) {
@@ -589,54 +553,7 @@ public class ShuffleUtils {
    * @return HttpConnectionParams
    */
   public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
-    int connectionTimeout =
-        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
-
-    int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
-
-    int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
-
-    boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
-
-    int keepAliveMaxConnections = conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
-
-    if (keepAlive) {
-      System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
-      System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections));
-    }
-
-    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
-
-    if (sslShuffle) {
-      if (sslFactory == null) {
-        synchronized (HttpConnectionParams.class) {
-          //Create sslFactory if it is null or if it was destroyed earlier
-          if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() ==
null) {
-            sslFactory =
-                new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf);
-            try {
-              sslFactory.init();
-            } catch (Exception ex) {
-              sslFactory.destroy();
-              sslFactory = null;
-              throw new RuntimeException(ex);
-            }
-          }
-        }
-      }
-    }
-
-    HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive,
-        keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle,
-        sslFactory);
-    return httpConnParams;
+    return TezRuntimeUtils.getHttpConnectionParams(conf);
   }
 
   public static boolean isTezShuffleHandler(Configuration config) {


Mime
View raw message