tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-725. Allow profiling of specific containers. (sseth)
Date Mon, 13 Jan 2014 19:56:04 GMT
Updated Branches:
  refs/heads/master b2490904e -> 3c5b18579


TEZ-725. Allow profiling of specific containers. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3c5b1857
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3c5b1857
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3c5b1857

Branch: refs/heads/master
Commit: 3c5b18579e9ce0dd9d82147f41609258d9448b80
Parents: b249090
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Jan 13 11:47:18 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Jan 13 11:47:18 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  10 ++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  12 +-
 .../AMContainerEventLaunchRequest.java          |   9 +-
 .../app/rm/container/AMContainerHelpers.java    |  11 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  25 ++--
 .../dag/app/rm/container/AMContainerMap.java    |  37 +++++-
 .../tez/dag/utils/TezRuntimeChildJVM.java       |  14 +--
 .../dag/app/rm/container/TestAMContainer.java   |  27 ++++-
 .../app/rm/container/TestAMContainerMap.java    | 119 +++++++++++++++++++
 9 files changed, 212 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8561467..5b95f80 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -268,5 +268,15 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_GENERATE_DAG_VIZ =
       TEZ_PREFIX + "generate.dag.viz";
   public static final boolean TEZ_GENERATE_DAG_VIZ_DEFAULT = false;
+  
+  /**
+   * Comma separated list of containers which should be profiled.
+   */
+  public static final String TEZ_PROFILE_CONTAINER_LIST = TEZ_PREFIX + "profile.container.list";
+  
+  /**
+   * The string to be added to the JVM command line for containers being profiled.
+   */
+  public static final String TEZ_PROFILE_JVM_OPTS = TEZ_PREFIX + "profile.jvm.opts";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 66358dc..5473e77 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -322,7 +322,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     return new TaskScheduler(this, this.containerSignatureMatcher,
       host, port, trackingUrl, appContext);
   }
-
+  
   @Override
   public synchronized void serviceStart() {
     // FIXME hack alert how is this supposed to support multiple DAGs?
@@ -395,13 +395,9 @@ public class TaskSchedulerEventHandler extends AbstractService
     // because the deallocateTask downcall may have raced with the
     // taskAllocated() upcall
     assert task.equals(taskAttempt);
-    if (appContext.getAllContainers().get(containerId).getState()
-        == AMContainerState.ALLOCATED) {
-
-      sendEvent(new AMContainerEventLaunchRequest(
-          containerId,
-          taskAttempt.getVertexID(),
-          false,
+    
+    if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED)
{
+      sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
           event.getContainerContext()));
     }
     sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
index f97863a..d973264 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
@@ -26,15 +26,12 @@ import org.apache.tez.dag.records.TezVertexID;
 public class AMContainerEventLaunchRequest extends AMContainerEvent {
 
   private final TezVertexID vertexId;
-  private final boolean shouldProfile;
   private final ContainerContext containerContext;
 
   public AMContainerEventLaunchRequest(ContainerId containerId,
-      TezVertexID vertexId, boolean shouldProfile,
-      ContainerContext containerContext) {
+      TezVertexID vertexId, ContainerContext containerContext) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
     this.vertexId = vertexId;
-    this.shouldProfile = shouldProfile;
     this.containerContext = containerContext;
   }
 
@@ -46,10 +43,6 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
     return this.vertexId;
   }
 
-  public boolean shouldProfile() {
-    return this.shouldProfile;
-  }
-  
   public ContainerContext getContainerContext() {
     return this.containerContext;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index e9dc71c..4d14c2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.rm.container;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
@@ -45,7 +46,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.TezRuntimeChildJVM;
 import org.apache.tez.runtime.library.common.security.TokenCache;
@@ -142,8 +142,8 @@ public class AMContainerHelpers {
       Map<String, LocalResource> localResources,
       Map<String, String> vertexEnv,
       String javaOpts,
-      TaskAttemptListener taskAttemptListener, Credentials credentials,
-      boolean shouldProfile, AppContext appContext) {
+      InetSocketAddress taskAttemptListenerAddress, Credentials credentials,
+      boolean shouldProfile, String profileOpts, AppContext appContext) {
 
     ContainerLaunchContext commonContainerSpec = null;
     synchronized (commonContainerSpecLock) {
@@ -178,12 +178,11 @@ public class AMContainerHelpers {
     myEnv.putAll(env);
     myEnv.putAll(vertexEnv);
 
-    // Set up the launch command
     List<String> commands = TezRuntimeChildJVM.getVMCommand(
-        taskAttemptListener.getAddress(), containerId.toString(),
+        taskAttemptListenerAddress, containerId.toString(),
         appContext.getApplicationID().toString(),
         appContext.getApplicationAttemptId().getAttemptId(),
-        shouldProfile, javaOpts);
+        shouldProfile, profileOpts, javaOpts);
 
     // Duplicate the ByteBuffers for access by multiple containers.
     Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 2ea9c0c..95cb69d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -78,6 +78,10 @@ public class AMContainerImpl implements AMContainer {
   private final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
   private final ContainerSignatureMatcher signatureMatcher;
+  @VisibleForTesting
+  final boolean shouldProfile;
+  @VisibleForTesting
+  final String profileJavaOpts;
 
   private final List<TezTaskAttemptID> completedAttempts =
       new LinkedList<TezTaskAttemptID>();
@@ -113,8 +117,6 @@ public class AMContainerImpl implements AMContainer {
   private boolean inError = false;
 
   @VisibleForTesting
-  ContainerLaunchContext clc;
-  @VisibleForTesting
   Map<String, LocalResource> containerLocalResources;
   @VisibleForTesting
   Map<String, LocalResource> additionalLocalResources;
@@ -202,7 +204,8 @@ public class AMContainerImpl implements AMContainer {
   // Attempting to use a container based purely on reosurces required, etc needs
   // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
-      TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher, AppContext appContext)
{
+      TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
+      boolean shouldProfile, String profileJavaOpts, AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -214,6 +217,8 @@ public class AMContainerImpl implements AMContainer {
     this.containerHeartbeatHandler = chh;
     this.taskAttemptListener = tal;
     this.failedAssignments = new LinkedList<TezTaskAttemptID>();
+    this.shouldProfile = shouldProfile;
+    this.profileJavaOpts = profileJavaOpts;
 
     this.noAllocationContainerTask = WAIT_TASK;
     this.stateMachine = stateMachineFactory.make(this);
@@ -364,26 +369,22 @@ public class AMContainerImpl implements AMContainer {
       container.credentials = containerContext.getCredentials();
       container.credentialsChanged = true;
 
-      container.clc = AMContainerHelpers.createContainerLaunchContext(
+      ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext(
           container.appContext.getCurrentDAGID(),
           container.appContext.getApplicationACLs(),
           container.getContainerId(),
           containerContext.getLocalResources(),
           containerContext.getEnvironment(),
           containerContext.getJavaOpts(),
-          container.taskAttemptListener, containerContext.getCredentials(),
-          event.shouldProfile(), container.appContext);
+          container.taskAttemptListener.getAddress(), containerContext.getCredentials(),
+          container.shouldProfile, container.profileJavaOpts, container.appContext);
 
       // Registering now, so that in case of delayed NM response, the child
       // task is not told to die since the TAL does not know about the container.
       container.registerWithTAListener();
-      container.sendStartRequestToNM();
+      container.sendStartRequestToNM(clc);
       LOG.info("Sending Launch Request for Container with id: " +
           container.container.getId());
-      // Forget about the clc to save resources. At some point, part of the clc
-      // info may need to be exposed to the scheduler to figure out whether a
-      // container can be used for a specific TaskAttempt.
-      container.clc = null;
     }
   }
 
@@ -1004,7 +1005,7 @@ public class AMContainerImpl implements AMContainer {
     sendEvent(new TaskAttemptEventNodeFailed(taId, message));
   }
 
-  protected void sendStartRequestToNM() {
+  protected void sendStartRequestToNM(ContainerLaunchContext clc) {
     sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 9d26299..390a083 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -19,20 +19,23 @@
 package org.apache.tez.dag.app.rm.container;
 
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
 
-public class AMContainerMap extends AbstractService implements
-    EventHandler<AMContainerEvent> {
+public class AMContainerMap extends AbstractService implements EventHandler<AMContainerEvent>
{
 
   private static final Log LOG = LogFactory.getLog(AMContainerMap.class);
 
@@ -41,6 +44,8 @@ public class AMContainerMap extends AbstractService implements
   private final AppContext context;
   private final ContainerSignatureMatcher containerSignatureMatcher;
   private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+  private Set<Integer> profileContainerSet;
+  private String commonProfileJavaOpts;
 
   public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
       ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
@@ -53,9 +58,30 @@ public class AMContainerMap extends AbstractService implements
   }
 
   @Override
+  public synchronized void serviceInit(Configuration conf) {
+    Collection<String> profileContainers = getConfig().getTrimmedStringCollection(
+        TezConfiguration.TEZ_PROFILE_CONTAINER_LIST);
+    if (profileContainers != null && !profileContainers.isEmpty()) {
+      profileContainerSet = new HashSet<Integer>();
+      for (String containerNum : profileContainers) {
+        profileContainerSet.add(Integer.parseInt(containerNum));
+      }
+      commonProfileJavaOpts = conf.get(TezConfiguration.TEZ_PROFILE_JVM_OPTS);
+      if (!profileContainerSet.isEmpty()
+          && (commonProfileJavaOpts == null || commonProfileJavaOpts.isEmpty()))
{
+        LOG.warn("Profiling specified for " + profileContainerSet.size()
+            + " containers, but no profiling string specified. "
+            + TezConfiguration.TEZ_PROFILE_JVM_OPTS + " needs to be set");
+      }
+      LOG.info("Containers to be profile: " + profileContainerSet );
+    }
+    
+  }
+
+  @Override
   public void handle(AMContainerEvent event) {
     AMContainer container = containerMap.get(event.getContainerId());
-    if(container != null) {
+    if (container != null) {
       container.handle(event);
     } else {
       LOG.info("Event for unknown container: " + event.getContainerId());
@@ -63,7 +89,10 @@ public class AMContainerMap extends AbstractService implements
   }
 
   public boolean addContainerIfNew(Container container) {
-    AMContainer amc = new AMContainerImpl(container, chh, tal, containerSignatureMatcher,
context);
+    boolean shouldProfile = profileContainerSet == null ? false : profileContainerSet
+        .contains(container.getId().getId());
+    AMContainer amc = new AMContainerImpl(container, chh, tal, containerSignatureMatcher,
+        shouldProfile, shouldProfile ? commonProfileJavaOpts : null, context);
     return (containerMap.putIfAbsent(container.getId(), amc) == null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
index e1219c1..f91a909 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
@@ -70,6 +70,7 @@ public class TezRuntimeChildJVM {
       String tokenIdentifier,
       int applicationAttemptNumber,
       boolean shouldProfile,
+      String profileOpts,
       String javaOpts) {
 
     Vector<String> vargs = new Vector<String>(9);
@@ -79,21 +80,13 @@ public class TezRuntimeChildJVM {
     //set custom javaOpts
     vargs.add(javaOpts);
 
-//[Debug Task] Current simplest way to attach debugger to  Tez Child Task
-// Uncomment the following, then launch a regular job
-// Works best on one-box configured with a single container (hence one task at a time).
-//    LOG.error(" !!!!!!!!! Launching Child-Task in debug/suspend mode.  Attach to port 8003
!!!!!!!!");
-//    vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8003,server=y,suspend=y");
-
     Path childTmpDir = new Path(Environment.PWD.$(),
         YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);
 
     // FIXME Setup the log4j properties
-
-    // Decision to profile needs to be made in the scheduler.
-    if (shouldProfile) {
-      // FIXME add support for profiling
+    if (shouldProfile && profileOpts != null) {
+      vargs.add(profileOpts);
     }
 
     // Add main class and its arguments
@@ -109,6 +102,7 @@ public class TezRuntimeChildJVM {
     vargs.add("1>" + getTaskLogFile(LogName.STDOUT));
     vargs.add("2>" + getTaskLogFile(LogName.STDERR));
 
+    // TODO Is this StringBuilder really required ? YARN already accepts a list of commands.
     // Final commmand
     StringBuilder mergedCommand = new StringBuilder();
     for (CharSequence str : vargs) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 77b2fc9..f74d956 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -69,6 +70,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -897,7 +899,6 @@ public class TestAMContainer {
 
     // Verify references are cleared after a container completes.
     wc.containerCompleted(false);
-    assertNull(wc.amContainer.clc);
     assertNull(wc.amContainer.containerLocalResources);
     assertNull(wc.amContainer.additionalLocalResources);
   }
@@ -984,6 +985,20 @@ public class TestAMContainer {
     assertNull(fetchedTask.getCredentials());
     wc.taskAttemptSucceeded(attempt32);
   }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testContainerProfiling() {
+    WrappedContainer wc = new WrappedContainer(true, "profileString");
+    wc.launchContainer();
+    List<Event> events = wc.verifyCountAndGetOutgoingEvents(1);
+    Event event = events.get(0);
+    assertTrue(event instanceof NMCommunicatorLaunchRequestEvent);
+    NMCommunicatorLaunchRequestEvent lrEvent = (NMCommunicatorLaunchRequestEvent) event;
+    ContainerLaunchContext clc = lrEvent.getContainerLaunchContext();
+    assertNotNull(clc);
+    assertTrue(clc.getCommands().get(0).contains("profileString"));
+  }
 
   // TODO Verify diagnostics in most of the tests.
 
@@ -1015,7 +1030,7 @@ public class TestAMContainer {
 
     public AMContainerImpl amContainer;
 
-    public WrappedContainer() {
+    public WrappedContainer(boolean shouldProfile, String profileString) {
       applicationID = ApplicationId.newInstance(rmIdentifier, 1);
       appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
       containerID = ContainerId.newInstance(appAttemptID, 1);
@@ -1051,7 +1066,11 @@ public class TestAMContainer {
       doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
 
       amContainer = new AMContainerImpl(container, chh, tal,
-          new ContainerContextMatcher(), appContext);
+          new ContainerContextMatcher(), shouldProfile, profileString, appContext);
+    }
+    
+    public WrappedContainer() {
+      this(false, null);
     }
     
     protected void mockDAGID() {
@@ -1090,7 +1109,7 @@ public class TestAMContainer {
       @SuppressWarnings("unchecked")
       Token<JobTokenIdentifier> jobToken = mock(Token.class);
       TokenCache.setJobToken(jobToken, credentials);
-      amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID, false,
+      amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
           new ContainerContext(localResources, credentials, new HashMap<String, String>(),
"")));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3c5b1857/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
new file mode 100644
index 0000000..b97aa45
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.container;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.junit.Test;
+
+public class TestAMContainerMap {
+
+  @Test
+  public void testAMContainerMap() throws IOException {
+    ContainerHeartbeatHandler chh = mockContainerHeartBeatHandler();
+    TaskAttemptListener tal = mockTaskAttemptListener();
+    AppContext context = mockAppContext();
+    AMContainerMap amContainerMap = new AMContainerMap(chh, tal, new ContainerContextMatcher(),
+        context);
+
+    Configuration conf = new Configuration();
+    conf.set(TezConfiguration.TEZ_PROFILE_CONTAINER_LIST, "2, 4");
+    conf.set(TezConfiguration.TEZ_PROFILE_JVM_OPTS, "testJvmOpts");
+
+    amContainerMap.init(conf);
+    amContainerMap.start();
+
+    ContainerId cId1 = mockContainerId(1);
+    ContainerId cId2 = mockContainerId(2);
+    ContainerId cId3 = mockContainerId(3);
+    ContainerId cId4 = mockContainerId(4);
+
+    amContainerMap.addContainerIfNew(mockContainer(cId1));
+    amContainerMap.addContainerIfNew(mockContainer(cId2));
+    amContainerMap.addContainerIfNew(mockContainer(cId3));
+    amContainerMap.addContainerIfNew(mockContainer(cId4));
+
+    AMContainerImpl amContainer = (AMContainerImpl) amContainerMap.get(cId1);
+    assertFalse(amContainer.shouldProfile);
+    assertNull(amContainer.profileJavaOpts);
+
+    amContainer = (AMContainerImpl) amContainerMap.get(cId2);
+    assertTrue(amContainer.shouldProfile);
+    assertEquals("testJvmOpts", amContainer.profileJavaOpts);
+
+    amContainer = (AMContainerImpl) amContainerMap.get(cId3);
+    assertFalse(amContainer.shouldProfile);
+    assertNull(amContainer.profileJavaOpts);
+
+    amContainer = (AMContainerImpl) amContainerMap.get(cId4);
+    assertTrue(amContainer.shouldProfile);
+    assertEquals("testJvmOpts", amContainer.profileJavaOpts);
+
+    amContainerMap.close();
+  }
+
+  private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
+    return mock(ContainerHeartbeatHandler.class);
+  }
+
+  private TaskAttemptListener mockTaskAttemptListener() {
+    TaskAttemptListener tal = mock(TaskAttemptListener.class);
+    InetSocketAddress socketAddr = new InetSocketAddress("localhost", 21000);
+    doReturn(socketAddr).when(tal).getAddress();
+    return tal;
+  }
+
+  private AppContext mockAppContext() {
+    AppContext appContext = mock(AppContext.class);
+    return appContext;
+  }
+
+  private ContainerId mockContainerId(int cId) {
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, cId);
+    return containerId;
+  }
+
+  private Container mockContainer(ContainerId containerId) {
+    NodeId nodeId = NodeId.newInstance("localhost", 43255);
+    Container container = Container.newInstance(containerId, nodeId, "localhost:33333",
+        Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class));
+    return container;
+  }
+}


Mime
View raw message