tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [45/50] [abbrv] tez git commit: TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes. (kshukla)
Date Wed, 24 May 2017 21:08:14 GMT
TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker classes.
(kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/886fac7f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/886fac7f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/886fac7f

Branch: refs/heads/master
Commit: 886fac7f4d98c5c6542c557838ddc2f7b2c5864c
Parents: b81592a
Author: Kuhu Shukla <kshukla@yahoo-inc.com>
Authored: Tue May 16 12:40:56 2017 -0500
Committer: Kuhu Shukla <kshukla@yahoo-inc.com>
Committed: Tue May 16 12:40:56 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../tez/dag/app/launcher/DagDeleteRunnable.java | 29 +++++++++++++++-----
 .../tez/dag/app/launcher/DeletionTracker.java   |  6 ++--
 .../dag/app/launcher/DeletionTrackerImpl.java   | 20 ++++++++++----
 .../app/launcher/LocalContainerLauncher.java    | 19 ++++---------
 .../app/launcher/TezContainerLauncherImpl.java  |  6 ++--
 .../app/rm/container/AMContainerHelpers.java    |  4 +--
 .../dag/app/rm/container/AMContainerImpl.java   |  6 ++--
 .../dag/app/rm/container/AMContainerMap.java    |  6 +++-
 .../dag/app/rm/container/TestAMContainer.java   |  4 ++-
 10 files changed, 60 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index f407ac5..a2afb15 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3725. Cleanup http connections and other unnecessary fields in DAG Deletion tracker
classes.
   TEZ-3705. Modify DeletionTracker and deletion threads to be initialized only if enabled
for tez_shuffle
   TEZ-3685. ShuffleHandler completedInputSet off-by-one error
   TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
index 6d966b0..eac745e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DagDeleteRunnable.java
@@ -24,41 +24,56 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.URL;
 
 class DagDeleteRunnable implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(DagDeleteRunnable.class);
   final NodeId nodeId;
   final TezDAGID dag;
   final JobTokenSecretManager jobTokenSecretManager;
-  final String tezDefaultComponentName;
   final int shufflePort;
   final HttpConnectionParams httpConnectionParams;
 
   public DagDeleteRunnable(NodeId nodeId, int shufflePort, TezDAGID currentDag,
                            HttpConnectionParams httpConnectionParams,
-                           JobTokenSecretManager jobTokenSecretMgr, String tezDefaultComponent)
{
+                           JobTokenSecretManager jobTokenSecretMgr) {
     this.nodeId = nodeId;
     this.shufflePort = shufflePort;
     this.dag = currentDag;
     this.httpConnectionParams = httpConnectionParams;
     this.jobTokenSecretManager = jobTokenSecretMgr;
-    this.tezDefaultComponentName = tezDefaultComponent;
   }
 
   @Override
   public void run() {
+    BaseHttpConnection httpConnection = null;
     try {
       URL baseURL = TezRuntimeUtils.constructBaseURIForShuffleHandlerDagComplete(
           nodeId.getHost(), shufflePort,
           dag.getApplicationId().toString(), dag.getId(), false);
-      BaseHttpConnection httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL,
-          httpConnectionParams, "DAGDelete", jobTokenSecretManager);
+      httpConnection = TezRuntimeUtils.getHttpConnection(true, baseURL, httpConnectionParams,
+          "DAGDelete", jobTokenSecretManager);
       httpConnection.connect();
       httpConnection.getInputStream();
     } catch (Exception e) {
-      TezContainerLauncherImpl.LOG.warn("Could not setup HTTP Connection to the node " +
nodeId.getHost() + " for dag delete "
-          + e);
+      LOG.warn("Could not setup HTTP Connection to the node " + nodeId.getHost() + " for
dag delete. ", e);
+    } finally {
+      try {
+        if (httpConnection != null) {
+          httpConnection.cleanup(true);
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Encountered IOException for " + nodeId.getHost() + " during close. ", ioe);
+      }
     }
   }
+
+  @Override
+  public String toString() {
+    return "DagDeleteRunnable nodeId=" + nodeId + ", shufflePort=" + shufflePort + ", dagId="
+ dag.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
index c12f41e..27ece70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTracker.java
@@ -26,18 +26,16 @@ import org.apache.tez.dag.records.TezDAGID;
 public abstract class DeletionTracker {
 
   protected final Configuration conf;
-  protected String pluginName;
 
-  public DeletionTracker(Configuration conf, String pluginName) {
+  public DeletionTracker(Configuration conf) {
     this.conf = conf;
-    this.pluginName = pluginName;
   }
 
   public void dagComplete(TezDAGID dag, JobTokenSecretManager jobTokenSecretManager) {
     //do nothing
   }
 
-  public void addNodeShufflePorts(NodeId nodeId, int port) {
+  public void addNodeShufflePort(NodeId nodeId, int port) {
     //do nothing
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
index f0b2818..b7583ae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/DeletionTrackerImpl.java
@@ -22,6 +22,7 @@ package org.apache.tez.dag.app.launcher;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -32,13 +33,16 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DeletionTrackerImpl extends DeletionTracker {
+  private static final Logger LOG = LoggerFactory.getLogger(DeletionTrackerImpl.class);
   private Map<NodeId, Integer> nodeIdShufflePortMap;
   private ExecutorService dagCleanupService;
 
-  public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration
conf, String pluginName) {
-    super(conf, pluginName);
+  public DeletionTrackerImpl(Map<NodeId, Integer> nodeIdShufflePortMap, Configuration
conf) {
+    super(conf);
     this.nodeIdShufflePortMap = nodeIdShufflePortMap;
     this.dagCleanupService = new ThreadPoolExecutor(0, conf.getInt(TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT,
         TezConfiguration.TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT), 10,
@@ -54,16 +58,20 @@ public class DeletionTrackerImpl extends DeletionTracker {
       int shufflePort = entry.getValue();
       //TODO: add check for healthy node
       if (shufflePort != TezRuntimeUtils.INVALID_PORT) {
-        DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId,
-            shufflePort, dag, TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager,
this.pluginName);
-        dagCleanupService.submit(dagDeleteRunnable);
+        DagDeleteRunnable dagDeleteRunnable = new DagDeleteRunnable(nodeId, shufflePort,
dag,
+            TezRuntimeUtils.getHttpConnectionParams(conf), jobTokenSecretManager);
+        try {
+          dagCleanupService.submit(dagDeleteRunnable);
+        } catch (RejectedExecutionException rejectedException) {
+          LOG.info("Ignoring deletion request for " + dagDeleteRunnable);
+        }
       }
     }
     nodeIdShufflePortMap.clear();
   }
 
   @Override
-  public void addNodeShufflePorts(NodeId nodeId, int port) {
+  public void addNodeShufflePort(NodeId nodeId, int port) {
     if (port != TezRuntimeUtils.INVALID_PORT) {
       if(nodeIdShufflePortMap.get(nodeId) == null) {
         nodeIdShufflePortMap.put(nodeId, port);

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9c04781..4793bd7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -50,7 +50,6 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
@@ -143,14 +142,9 @@ public class LocalContainerLauncher extends ContainerLauncher {
       String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
           TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
       localEnv = Maps.newHashMap();
+      shufflePort = 0;
       AuxiliaryServiceHelper.setServiceDataIntoEnv(
-          auxiliaryService, ByteBuffer.allocate(4).putInt(0), localEnv);
-      try {
-        shufflePort = TezRuntimeUtils.deserializeShuffleProviderMetaData(
-            AuxiliaryServiceHelper.getServiceDataFromEnv(auxiliaryService, localEnv));
-      } catch (IOException e) {
-        LOG.warn("Could not extract shuffle aux-service port!");
-      }
+          auxiliaryService, ByteBuffer.allocate(4).putInt(shufflePort), localEnv);
     } else {
       localEnv = System.getenv();
     }
@@ -161,9 +155,6 @@ public class LocalContainerLauncher extends ContainerLauncher {
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LocalTaskExecutionThread
#%d")
             .build());
     this.taskExecutorService = MoreExecutors.listeningDecorator(rawExecutor);
-    String tezDefaultComponentName =
-        isLocalMode ? TezConstants.getTezUberServicePluginName() :
-        TezConstants.getTezYarnServicePluginName();
     boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf)
         && conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION,
         TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT);
@@ -172,8 +163,8 @@ public class LocalContainerLauncher extends ContainerLauncher {
           TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
       deletionTracker = ReflectionUtils.createClazzInstance(
           deletionTrackerClassName, new Class[]{
-              Map.class, Configuration.class, String.class},
-          new Object[]{new HashMap<NodeId, Integer>(), conf, tezDefaultComponentName});
+              Map.class, Configuration.class},
+          new Object[]{new HashMap<NodeId, Integer>(), conf});
     }
   }
 
@@ -279,7 +270,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
       runningContainers.put(event.getContainerId(), callback);
       Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
       if (deletionTracker != null) {
-        deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
+        deletionTracker.addNodeShufflePort(event.getNodeId(), shufflePort);
       }
     } catch (RejectedExecutionException e) {
       handleLaunchFailed(e, event.getContainerId());

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index c67fc01..922575f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -194,7 +194,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
           LOG.warn("Shuffle port cannot be found since services metadata response is missing");
         }
         if (deletionTracker != null) {
-          deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);
+          deletionTracker.addNodeShufflePort(event.getNodeId(), shufflePort);
         }
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
@@ -341,8 +341,8 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
           TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
       deletionTracker = ReflectionUtils.createClazzInstance(
           deletionTrackerClassName, new Class[]{
-              Map.class, Configuration.class, String.class},
-          new Object[]{new HashMap<NodeId, Integer>(), conf, TezConstants.getTezYarnServicePluginName()});
+              Map.class, Configuration.class},
+          new Object[]{new HashMap<NodeId, Integer>(), conf});
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index ba3ecad..f959a7c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -154,13 +154,11 @@ public class AMContainerHelpers {
       String javaOpts,
       InetSocketAddress taskAttemptListenerAddress, Credentials credentials,
       AppContext appContext, Resource containerResource,
-      Configuration conf) {
+      Configuration conf, String auxiliaryService) {
 
     ContainerLaunchContext commonContainerSpec = null;
     synchronized (commonContainerSpecLock) {
       if (!commonContainerSpecs.containsKey(tezDAGID)) {
-        String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
         commonContainerSpec =
             createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService);
         commonContainerSpecs.put(tezDAGID, commonContainerSpec);

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 94c8fe0..8e7df32 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -91,6 +91,7 @@ public class AMContainerImpl implements AMContainer {
   private final int schedulerId;
   private final int launcherId;
   private final int taskCommId;
+  private String auxiliaryService;
 
   private final List<TezTaskAttemptID> completedAttempts =
       new LinkedList<TezTaskAttemptID>();
@@ -313,7 +314,7 @@ public class AMContainerImpl implements AMContainer {
   // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
       TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher signatureMatcher,
-      AppContext appContext, int schedulerId, int launcherId, int taskCommId) {
+      AppContext appContext, int schedulerId, int launcherId, int taskCommId, String auxiliaryService)
{
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -329,6 +330,7 @@ public class AMContainerImpl implements AMContainer {
     this.launcherId = launcherId;
     this.taskCommId = taskCommId;
     this.stateMachine = stateMachineFactory.make(this);
+    this.auxiliaryService = auxiliaryService;
   }
 
   @Override
@@ -498,7 +500,7 @@ public class AMContainerImpl implements AMContainer {
           cAddress,
           containerContext.getCredentials(),
           container.appContext, container.container.getResource(),
-          container.appContext.getAMConf());
+          container.appContext.getAMConf(), container.auxiliaryService);
 
       // Registering now, so that in case of delayed NM response, the child
       // task is not told to die since the TAL does not know about the container.

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index ab43db1..1b2fe16 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
@@ -42,6 +43,7 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   private final AppContext context;
   private final ContainerSignatureMatcher containerSignatureMatcher;
   private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+  private String auxiliaryService;
 
   public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
       ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
@@ -51,6 +53,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
     this.context = context;
     this.containerSignatureMatcher = containerSignatureMatcher;
     this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
+    this.auxiliaryService = context.getAMConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
   }
 
   @Override
@@ -65,7 +69,7 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
 
   public boolean addContainerIfNew(Container container, int schedulerId, int launcherId,
int taskCommId) {
     AMContainer amc = new AMContainerImpl(container, chh, tal,
-      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
+      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService);
     return (containerMap.putIfAbsent(container.getId(), amc) == null);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/886fac7f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index ed14871..65883ee 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
@@ -1261,7 +1262,8 @@ public class TestAMContainer {
       doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
 
       amContainer = new AMContainerImpl(container, chh, tal,
-          new ContainerContextMatcher(), appContext, 0, 0, 0);
+          new ContainerContextMatcher(), appContext, 0, 0, 0, conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+          TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
     }
 
     public WrappedContainer() {


Mime
View raw message