tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [09/50] [abbrv] tez git commit: TEZ-3362. Delete intermediate data at DAG level for Shuffle Handler
Date Wed, 24 May 2017 21:07:38 GMT
TEZ-3362. Delete intermediate data at DAG level for Shuffle Handler


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fec0c5c5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fec0c5c5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fec0c5c5

Branch: refs/heads/master
Commit: fec0c5c5c5d47dc6b3f5a3bcce5513c9d261dc58
Parents: 613f1e0
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Tue Oct 25 13:20:40 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Tue Oct 25 13:20:40 2016 -0500

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  19 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   2 +-
 .../app/launcher/ContainerLauncherManager.java  |   8 +-
 .../app/launcher/ContainerLauncherWrapper.java  |  11 ++
 .../tez/dag/app/launcher/DagDeleteRunnable.java |  60 +++++++
 .../app/launcher/LocalContainerLauncher.java    |  50 ++++++
 .../app/launcher/TezContainerLauncherImpl.java  |  58 ++++++
 tez-plugins/tez-aux-services/pom.xml            |  29 +++
 .../apache/tez/auxservices/ShuffleHandler.java  |  44 ++++-
 .../tez/auxservices/TestShuffleHandler.java     |  81 +++++++++
 .../tez/auxservices/TestShuffleHandlerJobs.java | 175 +++++++++++++++++++
 .../library/common/shuffle/ShuffleUtils.java    |  18 ++
 .../library/input/OrderedGroupedKVInput.java    |   1 +
 .../runtime/library/input/UnorderedKVInput.java |   1 +
 .../output/OrderedPartitionedKVOutput.java      |   1 +
 .../library/output/UnorderedKVOutput.java       |   1 +
 .../org/apache/tez/test/MiniTezCluster.java     |  18 +-
 17 files changed, 558 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 732fee9..ce344bf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -655,7 +655,24 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
       + "resource.cpu.vcores";
   public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
-  
+
+  /** Boolean value. Instructs AM to delete Dag directory upon completion */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_AM_DAG_DELETE_ENABLED = TEZ_AM_PREFIX
+      + "dag.delete.enabled";
+  public static final boolean TEZ_AM_DAG_DELETE_ENABLED_DEFAULT = false;
+
+  /**
+   * Int value. Upper limit on the number of threads used to delete DAG directories on nodes.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT =
+      TEZ_AM_PREFIX + "dag.deletion.thread-count-limit";
+
+  public static final int TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT = 30;
+
   /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks
across
    * all vertices. Setting it to the same value for all tasks is helpful for container reuse
and 
    * thus good for performance typically. */

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 7ca7118..605e6f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -842,7 +842,7 @@ public class DAGAppMaster extends AbstractService {
       DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
       LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id="
+
           cleanupEvent.getDag().getID());
-      containerLauncherManager.dagComplete(cleanupEvent.getDag());
+      containerLauncherManager.dagComplete(cleanupEvent.getDag(), jobTokenSecretManager);
       taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
       nodes.dagComplete(cleanupEvent.getDag());
       containers.dagComplete(cleanupEvent.getDag());

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index f2c1cff..3bbb602 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.Utils;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
@@ -193,8 +194,11 @@ public class ContainerLauncherManager extends AbstractService
     }
   }
 
-  public void dagComplete(DAG dag) {
-    // Nothing required at the moment. Containers are shared across DAGs
+  public void dagComplete(DAG dag, JobTokenSecretManager secretManager) {
+
+    for (int i = 0 ; i < containerLaunchers.length ; i++) {
+      containerLaunchers[i].dagComplete(dag, secretManager);
+    }
   }
 
   public void dagSubmitted() {

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
index 08e287e..5f5f66e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
@@ -14,6 +14,8 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
@@ -37,4 +39,13 @@ public class ContainerLauncherWrapper {
   public ContainerLauncher getContainerLauncher() {
     return real;
   }
+
+  public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) {
+    if (real instanceof TezContainerLauncherImpl) {
+      ((TezContainerLauncherImpl)real).dagComplete(dag, jobTokenSecretManager);
+    }
+    if (real instanceof LocalContainerLauncher) {
+      ((LocalContainerLauncher)real).dagComplete(dag, jobTokenSecretManager);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
new file mode 100644
index 0000000..fefaf69
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+
+import java.net.URL;
+
+class DagDeleteRunnable implements Runnable {
+  final NodeId nodeId;
+  final DAG dag;
+  final JobTokenSecretManager jobTokenSecretManager;
+  final String tezDefaultComponentName;
+  final int shufflePort;
+
+  public DagDeleteRunnable(NodeId nodeId, int shufflePort, DAG currentDag,
+                           JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent)
{
+    this.nodeId = nodeId;
+    this.shufflePort = shufflePort;
+    this.dag = currentDag;
+    this.jobTokenSecretManager = jobTokenSecretMgr;
+    this.tezDefaultComponentName = tezDefaultComponent;
+  }
+
+  @Override
+  public void run() {
+    try {
+      URL baseURL = ShuffleUtils.constructBaseURIForShuffleHandlerDagComplete(
+          nodeId.getHost(), shufflePort,
+          dag.getID().getApplicationId().toString(), dag.getID().getId(), false);
+      BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection(true, baseURL,
+          ShuffleUtils.getHttpConnectionParams(dag.getConf()), "DAGDelete", jobTokenSecretManager);
+      httpConnection.connect();
+      httpConnection.getInputStream();
+    } catch (Exception e) {
+      TezContainerLauncherImpl.LOG.warn("Could not setup HTTP Connection to the node " +
nodeId.getHost() + " for dag delete "
+          + e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 153b2e0..eb9b459 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,8 +44,12 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -89,6 +93,10 @@ public class LocalContainerLauncher extends ContainerLauncher {
   private final ExecutionContext executionContext;
   private final int numExecutors;
   private final boolean isLocalMode;
+  int shufflePort = ShuffleUtils.UNDEFINED_PORT;
+  private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId,
Integer>();
+  private ExecutorService dagDeleteService;
+  boolean shouldDelete;
 
   private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
       runningContainers =
@@ -137,6 +145,12 @@ public class LocalContainerLauncher extends ContainerLauncher {
       localEnv = Maps.newHashMap();
       AuxiliaryServiceHelper.setServiceDataIntoEnv(
           auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv);
+      try {
+        shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(
+            AuxiliaryServiceHelper.getServiceDataFromEnv(auxiliaryService, localEnv));
+      } catch (IOException e) {
+        LOG.warn("Could not extract shuffle aux-service port!");
+      }
     } else {
       localEnv = System.getenv();
     }
@@ -147,6 +161,12 @@ public class LocalContainerLauncher extends ContainerLauncher {
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread
#%d")
             .build());
     this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor);
+    dagDeleteService = Executors.newFixedThreadPool(
+        conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT,
+            TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder()
+            .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build());
+    shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED,
+        TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT);
   }
 
   @Override
@@ -170,6 +190,10 @@ public class LocalContainerLauncher extends ContainerLauncher {
       taskExecutorService.shutdownNow();
     }
     callbackExecutor.shutdownNow();
+    if (dagDeleteService != null) {
+      dagDeleteService.shutdown();
+      dagDeleteService = null;
+    }
   }
 
 
@@ -247,6 +271,12 @@ public class LocalContainerLauncher extends ContainerLauncher {
       RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId());
       runningContainers.put(event.getContainerId(), callback);
       Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
+
+      if (isLocalMode && shufflePort != ShuffleUtils.UNDEFINED_PORT) {
+        if(nodeIdShufflePortMap.get(event.getNodeId()) == null) {
+          nodeIdShufflePortMap.put(event.getNodeId(), shufflePort);
+        }
+      }
     } catch (RejectedExecutionException e) {
       handleLaunchFailed(e, event.getContainerId());
     }
@@ -383,4 +413,24 @@ public class LocalContainerLauncher extends ContainerLauncher {
     }
   }
 
+  public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) {
+    if (!shouldDelete) {
+      return;
+    }
+    String tezDefaultComponentName =
+        isLocalMode ? TezConstants.getTezUberServicePluginName() :
+        TezConstants.getTezYarnServicePluginName();
+    for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) {
+      NodeId nodeId = entry.getKey();
+      int shufflePort = entry.getValue();
+      //TODO: add check for healthy node
+      if (shufflePort != ShuffleUtils.UNDEFINED_PORT) {
+        DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
+            shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName);
+        dagDeleteService.submit(dagDeleteRunnable);
+      }
+    }
+    nodeIdShufflePortMap.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index 1521dcb..0726d86 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -19,9 +19,14 @@
 package org.apache.tez.dag.app.launcher;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
@@ -30,8 +35,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -79,6 +89,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
   protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
   private ContainerManagementProtocolProxy cmProxy;
   private AtomicBoolean serviceStopped = new AtomicBoolean(false);
+  private final Map<NodeId, Integer> nodeIdShufflePortMap = new HashMap<NodeId,
Integer>();
+  private ExecutorService dagDeleteService;
 
   private Container getContainer(ContainerOp event) {
     ContainerId id = event.getBaseOperation().getContainerId();
@@ -164,6 +176,23 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
         // it from ASSIGNED to RUNNING state
         getContext().containerLaunched(containerID);
         this.state = ContainerState.RUNNING;
+
+        int shufflePort  = ShuffleUtils.UNDEFINED_PORT;
+        ByteBuffer portInfo =
+            response.getAllServicesMetaData().get(
+                conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+                    TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
+        if (portInfo != null) {
+          DataInputByteBuffer in = new DataInputByteBuffer();
+          in.reset(portInfo);
+          shufflePort = in.readInt();
+        }
+
+        if (shufflePort != ShuffleUtils.UNDEFINED_PORT) {
+          if(nodeIdShufflePortMap.get(event.getNodeId()) == null) {
+            nodeIdShufflePortMap.put(event.getNodeId(), shufflePort);
+          }
+        }
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
             + ExceptionUtils.getStackTrace(t);
@@ -301,6 +330,10 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
     };
     eventHandlingThread.setName("ContainerLauncher Event Handler");
     eventHandlingThread.start();
+    dagDeleteService = Executors.newFixedThreadPool(
+        conf.getInt(TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT,
+            TezConfiguration.TEZ_AM_DAG_DELETION_THREAD_COUNT_LIMIT_DEFAULT), new ThreadFactoryBuilder()
+            .setDaemon(true).setNameFormat("ShuffleDeleteService #%d").build());
   }
 
   @Override
@@ -315,6 +348,10 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
     if (launcherPool != null) {
       launcherPool.shutdownNow();
     }
+    if (dagDeleteService != null) {
+      dagDeleteService.shutdown();
+      dagDeleteService = null;
+    }
   }
 
   protected EventProcessor createEventProcessor(ContainerOp event) {
@@ -397,4 +434,25 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
       throw new TezUncheckedException(e);
     }
   }
+
+  public void dagComplete(DAG dag, JobTokenSecretManager jobTokenSecretManager) {
+    boolean shouldDelete = conf.getBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED,
+        TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED_DEFAULT);
+    if (!shouldDelete) {
+      return;
+    }
+    String tezDefaultComponentName = TezConstants.getTezYarnServicePluginName();
+      for (Map.Entry<NodeId, Integer> entry : nodeIdShufflePortMap.entrySet()) {
+        NodeId nodeId = entry.getKey();
+        int shufflePort = entry.getValue();
+        //TODO: add check for healthy node
+        if (shufflePort != ShuffleUtils.UNDEFINED_PORT) {
+          DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
+              shufflePort, dag, jobTokenSecretManager, tezDefaultComponentName);
+          dagDeleteService.submit(dagDeleteRunnable);
+        }
+      }
+      nodeIdShufflePortMap.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml
index 93fef71..e9fdd3f 100644
--- a/tez-plugins/tez-aux-services/pom.xml
+++ b/tez-plugins/tez-aux-services/pom.xml
@@ -103,6 +103,35 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+   <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+   <dependency>
+     <groupId>org.apache.tez</groupId>
+     <artifactId>tez-tests</artifactId>
+     <scope>test</scope>
+     <type>test-jar</type>
+   </dependency>
+   <dependency>
+     <groupId>org.apache.hadoop</groupId>
+     <artifactId>hadoop-yarn-server-tests</artifactId>
+     <scope>test</scope>
+     <type>test-jar</type>
+   </dependency>
+  <dependency>
+     <groupId>org.apache.hadoop</groupId>
+     <artifactId>hadoop-common</artifactId>
+     <scope>test</scope>
+     <type>test-jar</type>
+  </dependency>
+  <dependency>
+     <groupId>org.apache.tez</groupId>
+     <artifactId>tez-dag</artifactId>
+     <scope>test</scope>
+   </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index b00c28f..3799cbe 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -56,7 +56,9 @@ import java.util.regex.Pattern;
 import javax.crypto.SecretKey;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -899,6 +901,7 @@ public class ShuffleHandler extends AuxiliaryService {
       final Map<String,List<String>> q =
         new QueryStringDecoder(request.getUri()).getParameters();
       final List<String> keepAliveList = q.get("keepAlive");
+      final List<String> dagCompletedQ = q.get("dagCompleted");
       boolean keepAliveParam = false;
       if (keepAliveList != null && keepAliveList.size() == 1) {
         keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
@@ -919,7 +922,10 @@ public class ShuffleHandler extends AuxiliaryService {
             "\n  dagId: " + dagIdQ +
             "\n  keepAlive: " + keepAliveParam);
       }
-
+      // If the request is for Dag Deletion, process the request and send OK.
+      if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ))  {
+        return;
+      }
       if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) {
         sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
         return;
@@ -993,6 +999,26 @@ public class ShuffleHandler extends AuxiliaryService {
       }
     }
 
+    private boolean deleteDagDirectories(MessageEvent evt,
+                                         List<String> dagCompletedQ, List<String>
jobQ,
+                                         List<String> dagIdQ) {
+      if (dagCompletedQ != null && !dagCompletedQ.isEmpty()) {
+        String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0)));
+        try {
+          LocalFileSystem lfs = FileSystem.getLocal(conf);
+          for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(base, conf)) {
+            lfs.delete(dagPath, true);
+          }
+        } catch (IOException e) {
+          LOG.warn("Encountered exception during dag delete "+ e);
+        }
+        evt.getChannel().write(new DefaultHttpResponse(HTTP_1_1, OK));
+        evt.getChannel().close();
+        return true;
+      }
+      return false;
+    }
+
     /**
      * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
      * and increments it. This method is first called by messageReceived()
@@ -1050,16 +1076,22 @@ public class ShuffleHandler extends AuxiliaryService {
     }
 
     private String getBaseLocation(String jobId, String dagId, String user) {
+      final String baseStr =
+          getDagLocation(jobId, dagId, user) + "output" + Path.SEPARATOR;
+      return baseStr;
+    }
+
+    private String getDagLocation(String jobId, String dagId, String user) {
       final JobID jobID = JobID.forName(jobId);
       final ApplicationId appID =
           ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
-            jobID.getId());
-      final String baseStr =
+              jobID.getId());
+      final String dagStr =
           USERCACHE + Path.SEPARATOR + user + Path.SEPARATOR
               + APPCACHE + Path.SEPARATOR
-              + appID.toString() + Path.SEPARATOR + Constants.DAG_PREFIX +
-              dagId + Path.SEPARATOR + "output" + Path.SEPARATOR;
-      return baseStr;
+              + appID.toString() + Path.SEPARATOR + Constants.DAG_PREFIX + dagId
+              + Path.SEPARATOR;
+      return dagStr;
     }
 
     protected MapOutputInfo getMapOutputInfo(String dagId, String mapId,

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 00db3c4..3d622e6 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -1063,6 +1063,87 @@ public class TestShuffleHandler {
     }
   }
 
+  @Test(timeout = 5000)
+  public void testDagDelete() throws Exception {
+    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "simple");
+    UserGroupInformation.setConfiguration(conf);
+    File absLogDir = new File("target", TestShuffleHandler.class.
+        getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    String appAttemptId = "attempt_12345_1_m_1_0";
+    String user = "randomUser";
+    List<File> fileMap = new ArrayList<File>();
+    createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+        conf, fileMap);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void sendError(ChannelHandlerContext ctx, String message,
+                                   HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error(message));
+              ctx.getChannel().close();
+            }
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt =
+          new Token<JobTokenIdentifier>("identifier".getBytes(),
+              "password".getBytes(), new Text(user), new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler
+          .initializeApplication(new ApplicationInitializationContext(user,
+              appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+              outputBuffer.getLength())));
+      URL url =
+          new URL(
+              "http://127.0.0.1:"
+                  + shuffleHandler.getConfig().get(
+                  ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+                  + "/mapOutput?job=job_12345_0001&dag=1&dagCompleted=true");
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      String dagDirStr =
+          StringUtils.join(Path.SEPARATOR,
+              new String[] { absLogDir.getAbsolutePath(),
+                  ShuffleHandler.USERCACHE, user,
+                  ShuffleHandler.APPCACHE, appId.toString(),"dag_1/"});
+      File dagDir = new File(dagDirStr);
+      Assert.assertTrue("Dag Directory does not exist!", dagDir.exists());
+      conn.connect();
+      try {
+        DataInputStream is = new DataInputStream(conn.getInputStream());
+        is.close();
+        Assert.assertFalse("Dag Directory was not deleted!", dagDir.exists());
+      } catch (EOFException e) {
+        // ignore
+      }
+      Assert.assertEquals("sendError called due to shuffle error",
+          0, failures.size());
+    } finally {
+      shuffleHandler.stop();
+      FileUtil.fullyDelete(absLogDir);
+    }
+  }
+
   @Test(timeout = 4000)
   public void testSendMapCount() throws Exception {
     final List<ShuffleHandler.ReduceMapFileCount> listenerList =

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
new file mode 100644
index 0000000..c409bf8
--- /dev/null
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.auxservices;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .GetApplicationReportRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.examples.OrderedWordCount;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import static org.apache.tez.test.TestTezJobs.generateOrderedWordCountInput;
+import static org.apache.tez.test.TestTezJobs.verifyOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestShuffleHandlerJobs {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandlerJobs.class);
+
+  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+  private static int NUM_NMS = 5;
+  private static int NUM_DNS = 5;
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+      conf.setInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 22);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DNS)
+          .format(true).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+          + " not found. Not running test.");
+      return;
+    }
+
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniTezCluster(TestShuffleHandlerJobs.class.getName(), NUM_NMS,
+          1, 1);
+      Configuration conf = new Configuration();
+      conf.set(YarnConfiguration.NM_AUX_SERVICES,
+          ShuffleHandler.TEZ_SHUFFLE_SERVICEID);
+      String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+          ShuffleHandler.TEZ_SHUFFLE_SERVICEID);
+      conf.set(serviceStr, ShuffleHandler.class.getName());
+      conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+  @Test(timeout = 60000)
+  public void testOrderedWordCount() throws Exception {
+    String inputDirStr = "/tmp/owc-input/";
+    Path inputDir = new Path(inputDirStr);
+    Path stagingDirPath = new Path("/tmp/owc-staging-dir");
+    remoteFs.mkdirs(inputDir);
+    remoteFs.mkdirs(stagingDirPath);
+    generateOrderedWordCountInput(inputDir, remoteFs);
+
+    String outputDirStr = "/tmp/owc-output/";
+    Path outputDir = new Path(outputDirStr);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    tezConf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, ShuffleHandler.TEZ_SHUFFLE_SERVICEID);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    TezClient tezSession = TezClient.create("WordCountTest", tezConf);
+    tezSession.start();
+    try {
+      final OrderedWordCount job = new OrderedWordCount();
+      Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{"-counter",
+              inputDirStr, outputDirStr, "10"}, tezSession)==0);
+      verifyOutput(outputDir, remoteFs);
+      tezSession.stop();
+      ClientRMService rmService = mrrTezCluster.getResourceManager().getClientRMService();
+      boolean isAppComplete = false;
+      while(!isAppComplete) {
+        GetApplicationReportResponse resp = rmService.getApplicationReport(
+            new GetApplicationReportRequest() {
+              @Override
+              public ApplicationId getApplicationId() {
+                return job.getAppId();
+              }
+
+              @Override
+              public void setApplicationId(ApplicationId applicationId) {
+              }
+            });
+        if (resp.getApplicationReport().getYarnApplicationState() == YarnApplicationState.FINISHED)
{
+          isAppComplete = true;
+        }
+        Thread.sleep(100);
+      }
+      for(int i = 0; i < NUM_NMS; i++) {
+        String appPath = mrrTezCluster.getTestWorkDir() + "/" + this.getClass().getName()
+            + "-localDir-nm-" + i + "_0/usercache/" + UserGroupInformation.getCurrentUser().getUserName()
+            + "/appcache/" + job.getAppId();
+        String dagPathStr = appPath + "/dag_1";
+
+        File fs = new File(dagPathStr);
+        Assert.assertFalse(fs.exists());
+        fs = new File(appPath);
+        Assert.assertTrue(fs.exists());
+      }
+    } finally {
+      remoteFs.delete(stagingDirPath, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 5d2444c..ed2e26e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -76,6 +76,7 @@ public class ShuffleUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
   private static final long MB = 1024l * 1024l;
 
+  public static final int UNDEFINED_PORT = -1;
   //Shared by multiple threads
   private static volatile SSLFactory sslFactory;
 
@@ -222,6 +223,23 @@ public class ShuffleUtils {
     return sb;
   }
 
+  public static URL constructBaseURIForShuffleHandlerDagComplete(
+      String host, int port, String appId, int dagIdentifier, boolean sslShuffle)
+      throws MalformedURLException{
+    final String http_protocol = (sslShuffle) ? "https://" : "http://";
+    StringBuilder sb = new StringBuilder(http_protocol);
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append("/");
+    sb.append("mapOutput?job=");
+    sb.append(appId.replace("application", "job"));
+    sb.append("&dag=");
+    sb.append(String.valueOf(dagIdentifier));
+    sb.append("&dagCompleted=true");
+    return new URL(sb.toString());
+  }
+
   public static URL constructInputURL(String baseURI,
       Collection<InputAttemptIdentifier> inputs, boolean keepAlive) throws MalformedURLException
{
     StringBuilder url = new StringBuilder(baseURI);

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 9a2a23e..69506af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -390,6 +390,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
     confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
+    confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 2d6683a..6a9dd62 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -283,6 +283,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
     confKeys.add(Constants.TEZ_RUNTIME_TASK_MEMORY);
+    confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 13e27eb..6ebcac8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -245,6 +245,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+    confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 4f74f7d..db6ecda 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -173,6 +173,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS);
+    confKeys.add(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/fec0c5c5/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index d7319c5..c727a8f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -173,15 +173,15 @@ public class MiniTezCluster extends MiniYARNCluster {
     conf.set(MRConfig.MASTER_ADDRESS, "test");
 
     //configure the shuffle service in NM
-    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
-        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
-    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
-        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
-        Service.class);
-
-    // Non-standard shuffle port
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
-
+    if (conf.get(YarnConfiguration.NM_AUX_SERVICES) == null) {
+      conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+          new String[]{ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID});
+      conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+          ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+          Service.class);
+      // Non-standard shuffle port
+      conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    }
     conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
         DefaultContainerExecutor.class, ContainerExecutor.class);
 


Mime
View raw message