tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-1032. Allow specifying tasks/vertices to be profiled. (Rajesh Balamohan via hitesh)
Date Wed, 11 Jun 2014 23:16:19 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master bd0e6bd2c -> 94499c9b8


TEZ-1032. Allow specifying tasks/vertices to be profiled. (Rajesh Balamohan via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/94499c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/94499c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/94499c9b

Branch: refs/heads/master
Commit: 94499c9b8b494e83f6a6d6b8bf166192b58d547f
Parents: bd0e6bd
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Jun 11 16:14:51 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Jun 11 16:14:51 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  20 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   6 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  32 ++-
 .../app/rm/container/AMContainerHelpers.java    |   5 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  11 +-
 .../dag/app/rm/container/AMContainerMap.java    |  29 +--
 .../tez/dag/utils/JavaProfilerOptions.java      | 168 ++++++++++++++
 .../tez/dag/utils/TezRuntimeChildJVM.java       |   8 -
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  11 +-
 .../tez/dag/app/rm/TestContainerReuse.java      | 196 +++++++++++++++-
 .../dag/app/rm/container/TestAMContainer.java   |  20 +-
 .../app/rm/container/TestAMContainerMap.java    |  44 ----
 .../tez/dag/utils/TestJavaProfilerOptions.java  | 225 +++++++++++++++++++
 13 files changed, 648 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0b7f52a..e2fdbb2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -343,14 +343,24 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_GENERATE_DAG_VIZ =
       TEZ_PREFIX + "generate.dag.viz";
   public static final boolean TEZ_GENERATE_DAG_VIZ_DEFAULT = true;
-  
+
   /**
-   * Comma separated list of containers which should be profiled.
+   * Set of tasks that should be profiled.
+   * Format: "vertexName[csv of task ids];vertexName[csv of task ids].."
+   * Valid e.g:
+   * v[0,1,2]  - Profile tasks 0,1,2 of vertex v
+   * v[1,2,3];v2[5,6,7] - Profile specified tasks of vertices v and v2.
+   * v[1:5,20,30];v2[2:5,60,7] - Profile 1,2,3,4,5,20,30 of vertex v; 2,3,4,5,60,7 of vertex v2
+   * Partial ranges like :5, 1: are not supported.
+   * v[] - Profile all tasks in vertex v
    */
-  public static final String TEZ_PROFILE_CONTAINER_LIST = TEZ_PREFIX + "profile.container.list";
-  
+  public static final String TEZ_PROFILE_TASK_LIST = TEZ_PREFIX + "profile.task.list";
+
   /**
-   * The string to be added to the JVM command line for containers being profiled.
+   * Additional string to be added to the JVM options for tasks being profiled.
+   * __VERTEX_NAME__ and __TASK_INDEX__ can be specified, which would be replaced at
+   * runtime by vertex name and task index being profiled.
+   * e.g tez.profiler.jvm.opts=--agentpath:libpagent.so,dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__"
    */
   public static final String TEZ_PROFILE_JVM_OPTS = TEZ_PREFIX + "profile.jvm.opts";
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 61432c3..614bf98 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -105,6 +105,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.JavaProfilerOptions;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.OutputCommitter;
@@ -169,6 +170,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   boolean recoveryInitEventSeen = false;
   boolean recoveryStartEventSeen = false;
 
+  private JavaProfilerOptions profilerOptions;
+
   private static final DiagnosticsUpdateTransition
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
@@ -426,6 +429,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     this.aclsManager = new ApplicationACLsManager(conf);
 
+    this.profilerOptions = new JavaProfilerOptions(conf);
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -1250,7 +1254,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.eventHandler, dag.taskAttemptListener,
         dag.clock, dag.taskHeartbeatHandler,
         !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
-        dag.vertexGroups);
+        dag.vertexGroups, dag.profilerOptions);
     return v;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bbe0ccb..8aba90a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -132,6 +132,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.JavaProfilerOptions;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.RootInputSpecUpdate;
@@ -600,12 +601,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   boolean recoveryStartEventSeen = false;
   private VertexStats vertexStats = null;
 
+  private final JavaProfilerOptions profilerOpts;
+
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Clock clock,
-      TaskHeartbeatHandler thh, boolean commitVertexOutputs, 
-      AppContext appContext, VertexLocationHint vertexLocationHint, 
-      Map<String, VertexGroupInfo> dagVertexGroups) {
+      TaskHeartbeatHandler thh, boolean commitVertexOutputs,
+      AppContext appContext, VertexLocationHint vertexLocationHint,
+      Map<String, VertexGroupInfo> dagVertexGroups, JavaProfilerOptions profilerOpts) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
@@ -642,7 +645,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             .getEnvironmentSettingList());
     this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan
         .getTaskConfig().getJavaOpts() : null;
-
+    this.profilerOpts = profilerOpts;
     this.containerContext = new ContainerContext(this.localResources,
         appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
 
@@ -652,11 +655,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (vertexPlan.getOutputsCount() > 0) {
       setAdditionalOutputs(vertexPlan.getOutputsList());
     }
-    
+
     // Setup the initial parallelism early. This may be changed after
     // initialization or on a setParallelism call.
     this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
-    
+
     this.dagVertexGroups = dagVertexGroups;
 
     logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
@@ -1720,9 +1723,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // no code, for now
   }
 
+  private ContainerContext getContainerContext(int taskIdx) {
+    if (profilerOpts.shouldProfileJVM(vertexName, taskIdx)) {
+      String jvmOpts = profilerOpts.getProfilerOptions(javaOpts, vertexName, taskIdx);
+      ContainerContext context = new ContainerContext(this.localResources,
+          appContext.getCurrentDAG().getCredentials(), this.environment, jvmOpts);
+      return context;
+    } else {
+      return this.containerContext;
+    }
+  }
+
   private void createTasks() {
-    Configuration conf = this.conf;
     for (int i=0; i < this.numTasks; ++i) {
+      ContainerContext conContext = getContainerContext(i);
       TaskImpl task =
           new TaskImpl(this.getVertexId(), i,
               this.eventHandler,
@@ -1734,10 +1748,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               (this.targetVertices != null ?
                 this.targetVertices.isEmpty() : true),
               this.taskResource,
-              this.containerContext);
+              conContext);
       this.addTask(task);
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Created task for vertex " + this.getVertexId() + ": " +
+        LOG.debug("Created task for vertex " + logIdentifier + ": " +
             task.getTaskId());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index d65bbf1..b569fbd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -145,7 +145,7 @@ public class AMContainerHelpers {
       Map<String, String> vertexEnv,
       String javaOpts,
       InetSocketAddress taskAttemptListenerAddress, Credentials credentials,
-      boolean shouldProfile, String profileOpts, AppContext appContext) {
+      AppContext appContext) {
 
     ContainerLaunchContext commonContainerSpec = null;
     synchronized (commonContainerSpecLock) {
@@ -183,8 +183,7 @@ public class AMContainerHelpers {
     List<String> commands = TezRuntimeChildJVM.getVMCommand(
         taskAttemptListenerAddress, containerId.toString(),
         appContext.getApplicationID().toString(),
-        appContext.getApplicationAttemptId().getAttemptId(),
-        shouldProfile, profileOpts, javaOpts);
+        appContext.getApplicationAttemptId().getAttemptId(), javaOpts);
 
     // Duplicate the ByteBuffers for access by multiple containers.
     Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 6e6fa99..ae57658 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -82,10 +82,6 @@ public class AMContainerImpl implements AMContainer {
   private final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
   private final ContainerSignatureMatcher signatureMatcher;
-  @VisibleForTesting
-  final boolean shouldProfile;
-  @VisibleForTesting
-  final String profileJavaOpts;
 
   private final List<TezTaskAttemptID> completedAttempts =
       new LinkedList<TezTaskAttemptID>();
@@ -339,7 +335,7 @@ public class AMContainerImpl implements AMContainer {
   // additional change - JvmID, YarnChild, etc depend on TaskType.
   public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
       TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
-      boolean shouldProfile, String profileJavaOpts, AppContext appContext) {
+      AppContext appContext) {
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -351,9 +347,6 @@ public class AMContainerImpl implements AMContainer {
     this.containerHeartbeatHandler = chh;
     this.taskAttemptListener = tal;
     this.failedAssignments = new LinkedList<TezTaskAttemptID>();
-    this.shouldProfile = shouldProfile;
-    this.profileJavaOpts = profileJavaOpts;
-
     this.noAllocationContainerTask = WAIT_TASK;
     this.stateMachine = stateMachineFactory.make(this);
   }
@@ -511,7 +504,7 @@ public class AMContainerImpl implements AMContainer {
           containerContext.getEnvironment(),
           containerContext.getJavaOpts(),
           container.taskAttemptListener.getAddress(), containerContext.getCredentials(),
-          container.shouldProfile, container.profileJavaOpts, container.appContext);
+          container.appContext);
 
       // Registering now, so that in case of delayed NM response, the child
       // task is not told to die since the TAL does not know about the container.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 390a083..77d91fd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -44,8 +44,6 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   private final AppContext context;
   private final ContainerSignatureMatcher containerSignatureMatcher;
   private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
-  private Set<Integer> profileContainerSet;
-  private String commonProfileJavaOpts;
 
   public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
       ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
@@ -58,27 +56,6 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   }
 
   @Override
-  public synchronized void serviceInit(Configuration conf) {
-    Collection<String> profileContainers = getConfig().getTrimmedStringCollection(
-        TezConfiguration.TEZ_PROFILE_CONTAINER_LIST);
-    if (profileContainers != null && !profileContainers.isEmpty()) {
-      profileContainerSet = new HashSet<Integer>();
-      for (String containerNum : profileContainers) {
-        profileContainerSet.add(Integer.parseInt(containerNum));
-      }
-      commonProfileJavaOpts = conf.get(TezConfiguration.TEZ_PROFILE_JVM_OPTS);
-      if (!profileContainerSet.isEmpty()
-          && (commonProfileJavaOpts == null || commonProfileJavaOpts.isEmpty())) {
-        LOG.warn("Profiling specified for " + profileContainerSet.size()
-            + " containers, but no profiling string specified. "
-            + TezConfiguration.TEZ_PROFILE_JVM_OPTS + " needs to be set");
-      }
-      LOG.info("Containers to be profile: " + profileContainerSet );
-    }
-    
-  }
-
-  @Override
   public void handle(AMContainerEvent event) {
     AMContainer container = containerMap.get(event.getContainerId());
     if (container != null) {
@@ -89,10 +66,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
   }
 
   public boolean addContainerIfNew(Container container) {
-    boolean shouldProfile = profileContainerSet == null ? false : profileContainerSet
-        .contains(container.getId().getId());
-    AMContainer amc = new AMContainerImpl(container, chh, tal, containerSignatureMatcher,
-        shouldProfile, shouldProfile ? commonProfileJavaOpts : null, context);
+    AMContainer amc = new AMContainerImpl(container, chh, tal,
+      containerSignatureMatcher, context);
     return (containerMap.putIfAbsent(container.getId(), amc) == null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
new file mode 100644
index 0000000..6539944
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
@@ -0,0 +1,168 @@
+package org.apache.tez.dag.utils;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+/**
+ * Placeholder to store JVM profiler related information
+ */
+public class JavaProfilerOptions {
+  private static final Log LOG = LogFactory.getLog(JavaProfilerOptions.class);
+
+  //To check any characters apart from "a-zA-Z_0-9 : ; , [] space" anywhere in input.
+  final static Pattern INVALID_TASKS_TO_PROFILE_REGEX = Pattern
+    .compile("[^(\\w\\s;:,\\[\\])]");
+
+  //vertex name can not have [a-zA-Z_0-9] and space. Task id is expected to be a number
+  //: is used for specifying task id range. , is used as task id separator.
+  final static Pattern TASKS_TO_PROFILE_REGEX = Pattern
+    .compile("([\\w\\s]+)\\[([\\d:,\\s]*)\\];?");
+
+  //Range regex where ':' should always be prepended and appended with digit.
+  final static Pattern RANGE_REGEX = Pattern.compile("(\\d+):(\\d+)");
+
+  private Map<String, BitSet> tasksToProfileMap;
+  private String jvmProfilingOpts;
+
+  public JavaProfilerOptions(Configuration conf) {
+    jvmProfilingOpts =
+        conf.getTrimmed(TezConfiguration.TEZ_PROFILE_JVM_OPTS, "");
+
+    if (!Strings.isNullOrEmpty(jvmProfilingOpts)) {
+      this.tasksToProfileMap = getTasksToProfile(conf);
+      if (!tasksToProfileMap.isEmpty() && jvmProfilingOpts.isEmpty()) {
+        LOG.warn(TezConfiguration.TEZ_PROFILE_JVM_OPTS
+            + " should be specified for profiling");
+      }
+    }
+  }
+
+  /**
+   * Get profiler options for a specific task in the vertex
+   *
+   * @param jvmOpts
+   *          vertex specific JVM option
+   * @param vertexName
+   * @param taskIdx
+   * @return jvmProfilingOpts
+   */
+  public String getProfilerOptions(String jvmOpts, String vertexName,
+      int taskIdx) {
+    jvmOpts = (jvmOpts == null) ? "" : jvmOpts;
+    vertexName = vertexName.replaceAll(" ", "");
+    String result =
+        jvmProfilingOpts.replaceAll("__VERTEX_NAME__", vertexName)
+          .replaceAll("__TASK_INDEX__", Integer.toString(taskIdx));
+    result = (jvmOpts + " " + result);
+
+    LOG.info("Profiling option added to vertexName=" + vertexName
+        + ", taskIdx=" + taskIdx + ", jvmOpts=" + result.trim());
+
+    return result.trim();
+  }
+
+  /**
+   * Find if the JVM needs profiling
+   *
+   * @param vertexName
+   * @param taskId
+   * @return boolean
+   */
+  public boolean shouldProfileJVM(String vertexName, int taskId) {
+    if (tasksToProfileMap == null || taskId < 0) {
+      return false;
+    }
+    BitSet taskSet = tasksToProfileMap.get(vertexName);
+    // profile all tasks in the vertex, if taskSet is empty
+    return (taskSet == null) ? false : ((taskSet.isEmpty()) ? true : taskSet.get(taskId));
+  }
+
+  /**
+   * <pre>
+   * Get the set of tasks to be profiled in the job. Example formats are
+   * v[0,1,2] - To profile subset of tasks in a vertex
+   * v[1,2,3];v2[5,6,7] - To profile multiple vertices
+   * v[1:5,20,30];v2[2:5,60,7] - To support range of tasks in vertices. Partial
+   * ranges are not supported (e.g v[:5],v2[2:]).
+   * v[] - To profile all tasks in a vertex
+   * </pre>
+   *
+   * @param conf
+   * @return Map<String, BitSet>
+   */
+  private Map<String, BitSet> getTasksToProfile(Configuration conf) {
+    String tasksToProfile =
+        conf.getTrimmed(TezConfiguration.TEZ_PROFILE_TASK_LIST, "");
+    final Map<String, BitSet> resultSet = new HashMap<String, BitSet>();
+    if (tasksToProfile.isEmpty() || !isValid(tasksToProfile)) {
+      return resultSet; // empty set
+    }
+    Matcher matcher = TASKS_TO_PROFILE_REGEX.matcher(tasksToProfile);
+    while (matcher.find()) {
+      String vertexName = matcher.group(1).trim();
+      BitSet profiledTaskSet = parseTasksToProfile(matcher.group(2).trim());
+      resultSet.put(vertexName, profiledTaskSet);
+    }
+    LOG.info("Tasks to profile info=" + resultSet);
+    return resultSet;
+  }
+
+  private boolean isValid(String tasksToProfile) {
+    if (INVALID_TASKS_TO_PROFILE_REGEX.matcher(tasksToProfile).find()) {
+      LOG.warn("Invalid option specified, "
+          + TezConfiguration.TEZ_PROFILE_TASK_LIST + "=" + tasksToProfile);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Get the set of tasks to be profiled within a vertex
+   *
+   * @param tasksToProfileInVertex
+   * @return Set<Integer> containing the task indexes to be profiled
+   */
+  private BitSet parseTasksToProfile(String tasksToProfileInVertex) {
+    BitSet profiledTaskSet = new BitSet();
+    if (Strings.isNullOrEmpty(tasksToProfileInVertex)) {
+      return profiledTaskSet;
+    }
+    Iterable<String> tasksInVertex =
+        Splitter.on(",").omitEmptyStrings().trimResults().split(tasksToProfileInVertex);
+    for (String task : tasksInVertex) {
+      /**
+       * TODO: this is horrible way to check the ranges.
+       * Should use RangeSet when guava is upgraded.  Also, need to support partial
+       * ranges like "1:", ":50".  With current implementation partial ranges are not
+       * allowed.
+       */
+      if (task.endsWith(":") || task.startsWith(":")) {
+       //invalid range. e.g :20, 6: are not supported.
+        LOG.warn("Partial range is considered as an invalid option");
+        return null;
+      }
+      Matcher taskMatcher = RANGE_REGEX.matcher(task);
+      if (taskMatcher.find()) {
+        int start = Integer.parseInt((taskMatcher.group(1).trim()));
+        int end = Integer.parseInt((taskMatcher.group(2).trim()));
+        for (int i = Math.min(start, end); i <= Math.max(start, end); i++) {
+          profiledTaskSet.set(i);
+        }
+      } else {
+        profiledTaskSet.set(Integer.parseInt(task.trim()));
+      }
+    }
+    return profiledTaskSet;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
index 46e200e..09239a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
@@ -69,8 +69,6 @@ public class TezRuntimeChildJVM {
       String containerIdentifier,
       String tokenIdentifier,
       int applicationAttemptNumber,
-      boolean shouldProfile,
-      String profileOpts,
       String javaOpts) {
 
     Vector<String> vargs = new Vector<String>(9);
@@ -84,11 +82,6 @@ public class TezRuntimeChildJVM {
         YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);
 
-    // FIXME Setup the log4j properties
-    if (shouldProfile && profileOpts != null) {
-      vargs.add(profileOpts);
-    }
-
     // Add main class and its arguments
     vargs.add(TezChild.class.getName());  // main of Child
 
@@ -112,5 +105,4 @@ public class TezRuntimeChildJVM {
     vargsFinal.add(mergedCommand.toString());
     return vargsFinal;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index db214e9..373a5e9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -121,6 +122,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.JavaProfilerOptions;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
@@ -178,6 +180,7 @@ public class TestVertexImpl {
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
   private HistoryEventHandler historyEventHandler;
+  private static JavaProfilerOptions javaProfilerOptions;
 
   public static class CountingOutputCommitter extends
       OutputCommitter {
@@ -1372,7 +1375,7 @@ public class TestVertexImpl {
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener,
-            clock, thh, true, appContext, locationHint, vertexGroups);
+            clock, thh, true, appContext, locationHint, vertexGroups, javaProfilerOptions);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -1428,6 +1431,8 @@ public class TestVertexImpl {
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
     dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
+    javaProfilerOptions = mock(JavaProfilerOptions.class);
+    doReturn(false).when(javaProfilerOptions).shouldProfileJVM(any(String.class), anyInt());
   }
 
   public void setupPostDagCreation() {
@@ -2743,7 +2748,7 @@ public class TestVertexImpl {
       VertexPlan vPlan = invalidDagPlan.getVertex(0);
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener,
-          clock, thh, true, appContext, vertexLocationHint, null);
+          clock, thh, true, appContext, vertexLocationHint, null, javaProfilerOptions);
       vertexIdMap.put(vId, v);
       vertices.put(v.getName(), v);
       v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
@@ -2772,7 +2777,7 @@ public class TestVertexImpl {
         AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
-          appContext, vertexLocationHint, null);
+          appContext, vertexLocationHint, null, javaProfilerOptions);
       this.dispatcher = dispatcher;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 2e89b66..307af71 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -85,6 +85,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.JavaProfilerOptions;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -479,6 +480,192 @@ public class TestContainerReuse {
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
+    taskScheduler.close();
+    taskSchedulerEventHandler.close();
+  }
+
+  @Test(timeout = 10000l)
+  public void testReuseWithProfilerOption() throws IOException, InterruptedException, ExecutionException {
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS, 0);
+    //Profile 3 tasks
+    tezConf.set(TezConfiguration.TEZ_PROFILE_TASK_LIST, "v1[1,3,4]");
+    tezConf.set(TezConfiguration.TEZ_PROFILE_JVM_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__");
+    JavaProfilerOptions profilerOptions =  new JavaProfilerOptions(tezConf);
+
+    RackResolver.init(tezConf);
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+
+    CapturingEventHandler eventHandler = new CapturingEventHandler();
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+    AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+    TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+    String appUrl = "url";
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+
+    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
+
+    AppContext appContext = mock(AppContext.class);
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    //Use ContainerContextMatcher here.  Otherwise it would not match the JVM options
+    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
+        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher());
+    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
+    taskSchedulerEventHandler.init(tezConf);
+    taskSchedulerEventHandler.start();
+
+    TaskSchedulerWithDrainableAppCallback taskScheduler =
+        (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+          .getSpyTaskScheduler();
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+    Resource resource1 = Resource.newInstance(1024, 1);
+    String[] host1 = {"host1"};
+    String[] host2 = {"host2"};
+    String[] host3 = {"host3"};
+
+    String []racks = {"/default-rack"};
+    Priority priority1 = Priority.newInstance(1);
+
+    TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    String jvmOpts = profilerOptions.getProfilerOptions("", "v1", 1);
+
+    /**
+     * Schedule 2 tasks (1 with profiler option and another in normal mode).
+     * Container should not be reused in this case.
+     */
+    //Vertex 1, Task 1, Attempt 1, host1
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+    TaskAttempt ta11 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent1 =
+        createLaunchRequestEvent(taID11, ta11, resource1, host1, racks,
+          priority1, localResources, jvmOpts);
+
+    //Vertex 1, Task 2, Attempt 1, host1
+    TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
+    TaskAttempt ta12 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent2 =
+        createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
+
+    taskSchedulerEventHandler.handleEvent(lrEvent1);
+    taskSchedulerEventHandler.handleEvent(lrEvent2);
+
+    Container container1 = createContainer(1, "host1", resource1, priority1);
+
+    // One container allocated.
+    drainNotifier.set(false);
+    taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
+
+    // First task had profiling on. This container can not be reused further.
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class), eq(container1));
+    verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    /**
+     * Schedule 2 tasks (both having different profiler option).
+     * Container should not be reused.
+     */
+    //Vertex 1, Task 3, Attempt 1, host2
+    jvmOpts = profilerOptions.getProfilerOptions("", "v1", 3);
+    TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
+    TaskAttempt ta13 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent3 =
+        createLaunchRequestEvent(taID13, ta13, resource1, host2, racks,
+          priority1, localResources, jvmOpts);
+
+    //Vertex 1, Task 4, Attempt 1, host2
+    jvmOpts = profilerOptions.getProfilerOptions("", "v1", 4);
+    TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
+    TaskAttempt ta14 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent4 =
+        createLaunchRequestEvent(taID14, ta14, resource1, host2, racks,
+          priority1, localResources, jvmOpts);
+
+    Container container2 = createContainer(2, "host2", resource1, priority1);
+    taskSchedulerEventHandler.handleEvent(lrEvent3);
+    taskSchedulerEventHandler.handleEvent(lrEvent4);
+
+    // Container started
+    drainNotifier.set(false);
+    taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container2));
+
+    // Verify that the container can not be reused when profiling option is turned on
+    // Even for 2 tasks having same profiling option can have container reusability.
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(taskScheduler).deallocateTask(eq(ta13), eq(true));
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container2));
+    verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    /**
+     * Schedule 2 tasks with same jvm profiling option.
+     * Container should be reused.
+     */
+    tezConf.set(TezConfiguration.TEZ_PROFILE_TASK_LIST, "v1[1,2,3,5,6]");
+    tezConf.set(TezConfiguration.TEZ_PROFILE_JVM_OPTS, "dummyOpts");
+    profilerOptions =  new JavaProfilerOptions(tezConf);
+
+    //Vertex 1, Task 5, Attempt 1, host3
+    TezTaskAttemptID taID15 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
+    TaskAttempt ta15 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent5 =
+        createLaunchRequestEvent(taID15, ta15, resource1, host3, racks,
+          priority1, localResources, profilerOptions. getProfilerOptions("", "v1", 5));
+
+    //Vertex 1, Task 6, Attempt 1, host3
+    jvmOpts = profilerOptions.getProfilerOptions("", "v1", 4);
+    TezTaskAttemptID taID16 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
+    TaskAttempt ta16 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent6 =
+        createLaunchRequestEvent(taID16, ta16, resource1, host3, racks,
+          priority1, localResources, profilerOptions. getProfilerOptions("", "v1", 6));
+
+    // Container started
+    Container container3 = createContainer(2, "host3", resource1, priority1);
+    taskSchedulerEventHandler.handleEvent(lrEvent5);
+    taskSchedulerEventHandler.handleEvent(lrEvent6);
+
+    drainNotifier.set(false);
+    taskScheduler.onContainersAllocated(Collections.singletonList(container3));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
+
+    //Ensure task 6 (of vertex 1) is allocated to same container
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED));
+    drainableAppCallback.drain();
+    verify(taskScheduler).deallocateTask(eq(ta15), eq(true));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
+    eventHandler.reset();
 
     taskScheduler.close();
     taskSchedulerEventHandler.close();
@@ -930,7 +1117,14 @@ public class TestContainerReuse {
       TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority,
       Map<String, LocalResource> localResources) {
     return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority,
-        new ContainerContext(localResources, new Credentials(), new HashMap<String, String>(), ""));
+        localResources, "");
+  }
+
+  private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID,
+      TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority,
+      Map<String, LocalResource> localResources, String jvmOpts) {
+    return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority,
+        new ContainerContext(localResources, new Credentials(), new HashMap<String, String>(), jvmOpts));
   }
 
   private static class ChangingDAGIDAnswer implements Answer<TezDAGID> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 8084f0d..0862116 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -981,20 +981,6 @@ public class TestAMContainer {
     assertNull(fetchedTask.getCredentials());
     wc.taskAttemptSucceeded(attempt32);
   }
-  
-  @SuppressWarnings("rawtypes")
-  @Test
-  public void testContainerProfiling() {
-    WrappedContainer wc = new WrappedContainer(true, "profileString");
-    wc.launchContainer();
-    List<Event> events = wc.verifyCountAndGetOutgoingEvents(1);
-    Event event = events.get(0);
-    assertTrue(event instanceof NMCommunicatorLaunchRequestEvent);
-    NMCommunicatorLaunchRequestEvent lrEvent = (NMCommunicatorLaunchRequestEvent) event;
-    ContainerLaunchContext clc = lrEvent.getContainerLaunchContext();
-    assertNotNull(clc);
-    assertTrue(clc.getCommands().get(0).contains("profileString"));
-  }
 
   // TODO Verify diagnostics in most of the tests.
 
@@ -1067,13 +1053,13 @@ public class TestAMContainer {
       doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
 
       amContainer = new AMContainerImpl(container, chh, tal,
-          new ContainerContextMatcher(), shouldProfile, profileString, appContext);
+          new ContainerContextMatcher(), appContext);
     }
-    
+
     public WrappedContainer() {
       this(false, null);
     }
-    
+
     protected void mockDAGID() {
       doReturn(dagID).when(appContext).getCurrentDAGID();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index b97aa45..a6d2f16 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -43,50 +43,6 @@ import org.junit.Test;
 
 public class TestAMContainerMap {
 
-  @Test
-  public void testAMContainerMap() throws IOException {
-    ContainerHeartbeatHandler chh = mockContainerHeartBeatHandler();
-    TaskAttemptListener tal = mockTaskAttemptListener();
-    AppContext context = mockAppContext();
-    AMContainerMap amContainerMap = new AMContainerMap(chh, tal, new ContainerContextMatcher(),
-        context);
-
-    Configuration conf = new Configuration();
-    conf.set(TezConfiguration.TEZ_PROFILE_CONTAINER_LIST, "2, 4");
-    conf.set(TezConfiguration.TEZ_PROFILE_JVM_OPTS, "testJvmOpts");
-
-    amContainerMap.init(conf);
-    amContainerMap.start();
-
-    ContainerId cId1 = mockContainerId(1);
-    ContainerId cId2 = mockContainerId(2);
-    ContainerId cId3 = mockContainerId(3);
-    ContainerId cId4 = mockContainerId(4);
-
-    amContainerMap.addContainerIfNew(mockContainer(cId1));
-    amContainerMap.addContainerIfNew(mockContainer(cId2));
-    amContainerMap.addContainerIfNew(mockContainer(cId3));
-    amContainerMap.addContainerIfNew(mockContainer(cId4));
-
-    AMContainerImpl amContainer = (AMContainerImpl) amContainerMap.get(cId1);
-    assertFalse(amContainer.shouldProfile);
-    assertNull(amContainer.profileJavaOpts);
-
-    amContainer = (AMContainerImpl) amContainerMap.get(cId2);
-    assertTrue(amContainer.shouldProfile);
-    assertEquals("testJvmOpts", amContainer.profileJavaOpts);
-
-    amContainer = (AMContainerImpl) amContainerMap.get(cId3);
-    assertFalse(amContainer.shouldProfile);
-    assertNull(amContainer.profileJavaOpts);
-
-    amContainer = (AMContainerImpl) amContainerMap.get(cId4);
-    assertTrue(amContainer.shouldProfile);
-    assertEquals("testJvmOpts", amContainer.profileJavaOpts);
-
-    amContainerMap.close();
-  }
-
   private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
     return mock(ContainerHeartbeatHandler.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/94499c9b/tez-dag/src/test/java/org/apache/tez/dag/utils/TestJavaProfilerOptions.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/utils/TestJavaProfilerOptions.java b/tez-dag/src/test/java/org/apache/tez/dag/utils/TestJavaProfilerOptions.java
new file mode 100644
index 0000000..60c3d79
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/utils/TestJavaProfilerOptions.java
@@ -0,0 +1,225 @@
+package org.apache.tez.dag.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.junit.Test;
+
+public class TestJavaProfilerOptions {
+  static Configuration conf = new Configuration();
+
+  private JavaProfilerOptions getOptions(Configuration conf, String tasks) {
+    return getOptions(conf, tasks, "dummyOpts");
+  }
+
+  private JavaProfilerOptions getOptions(Configuration conf, String tasks, String jvmOpts) {
+    conf.set(TezConfiguration.TEZ_PROFILE_TASK_LIST, tasks);
+    conf.set(TezConfiguration.TEZ_PROFILE_JVM_OPTS, jvmOpts);
+    return new JavaProfilerOptions(conf);
+  }
+
+  @Test
+  public void testTasksToBeProfiled() {
+    Random rnd = new Random();
+    Configuration conf = new Configuration();
+    JavaProfilerOptions profilerOptions = getOptions(conf, "");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+
+    profilerOptions = getOptions(conf, "v[0,1,2]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 0));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 2));
+    assertFalse(profilerOptions.shouldProfileJVM("v1", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[,5]", "dummyOpts");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 5));
+
+    profilerOptions = getOptions(conf, "v 1[1,5]", "dummyOpts");
+    assertTrue(profilerOptions.shouldProfileJVM("v 1", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v 1", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+    assertFalse(profilerOptions.shouldProfileJVM("1", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v 1[1,5], 5 [50,60], m  1[10, 11],", "dummyOpts");
+    assertTrue(profilerOptions.shouldProfileJVM("v 1", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v 1", 5));
+    assertTrue(profilerOptions.shouldProfileJVM("m  1", 10));
+    assertTrue(profilerOptions.shouldProfileJVM("m  1", 11));
+    assertTrue(profilerOptions.shouldProfileJVM("5", 50));
+    assertTrue(profilerOptions.shouldProfileJVM("5", 60));
+    assertFalse(profilerOptions.shouldProfileJVM("5", 600));
+    assertFalse(profilerOptions.shouldProfileJVM("m  1", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("1", rnd.nextInt(Integer.MAX_VALUE)));
+    assertFalse(profilerOptions.shouldProfileJVM("", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v 1[1,5], 5 [50,60],  @#425[10, 11]", "dummyOpts");
+    assertFalse(profilerOptions.shouldProfileJVM("v 1", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("@#425", 10));
+
+    profilerOptions = getOptions(conf, "v[0,1,2];v2[5,6:8]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 0));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 2));
+    assertTrue(profilerOptions.shouldProfileJVM("v2", 5));
+    assertTrue(profilerOptions.shouldProfileJVM("v2", 6));
+    assertTrue(profilerOptions.shouldProfileJVM("v2", 7));
+    assertTrue(profilerOptions.shouldProfileJVM("v2", 8));
+    assertFalse(profilerOptions.shouldProfileJVM("v5", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[1:3,5]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 2));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 3));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v5", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[3:1,5]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 2));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 3));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 5));
+
+    profilerOptions = getOptions(conf, "v[-1]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", -1));
+
+    // Profile all tasks in a vertex. ANY task in the vertex
+    profilerOptions = getOptions(conf, "v[]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 0));
+    assertTrue(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[,, ,]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[    ]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[:,,]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, " v[3:1,4]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 4));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 3));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, " v[1:3,4, 5]");
+    assertTrue(profilerOptions.shouldProfileJVM("v", 4));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 2));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 4));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 1));
+    assertTrue(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, " v[:,,:, 5]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+
+    profilerOptions = getOptions(conf, " v[ : ,,]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    //-ve tests
+    profilerOptions = getOptions(conf, "v12#fs[0,1,2]");
+    assertFalse(profilerOptions.shouldProfileJVM("v12#fs", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("fs", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("#", 0));
+
+    profilerOptions = getOptions(conf, "v[-3:1,5]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+
+    profilerOptions = getOptions(conf, " ^&*%[0,1,2]");
+    assertFalse(profilerOptions.shouldProfileJVM("^&*%", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+
+    profilerOptions = getOptions(conf, "^&*%[0,1,2]");
+    assertFalse(profilerOptions.shouldProfileJVM("^&*%", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("^&*%", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[-1]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", -1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, " [:, 4:]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 4));
+    assertFalse(profilerOptions.shouldProfileJVM("", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[:,,:]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 4));
+    assertFalse(profilerOptions.shouldProfileJVM("", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[:5,1]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 2));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 3));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 4));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 6));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[1:,5]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, "v[:1,5]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 0));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", -1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, " v[1:,4, 5],    [5,4]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 4));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 5));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 3));
+    assertFalse(profilerOptions.shouldProfileJVM(" ", 4));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+
+    profilerOptions = getOptions(conf, " v[-3:1,4]");
+    assertFalse(profilerOptions.shouldProfileJVM("v", 4));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 1));
+    assertFalse(profilerOptions.shouldProfileJVM("v", 3));
+    assertFalse(profilerOptions.shouldProfileJVM("v", -3));
+    assertFalse(profilerOptions.shouldProfileJVM("v", rnd.nextInt(Integer.MAX_VALUE)));
+  }
+
+  @Test
+  public void testProfilingJVMOpts() {
+    Configuration conf = new Configuration();
+    JavaProfilerOptions profilerOptions = getOptions(conf, "", "");
+    String jvmOpts = profilerOptions.getProfilerOptions("", "", 0);
+    assertTrue(jvmOpts.trim().equals(""));
+
+    profilerOptions = getOptions(conf, "", "dir=__VERTEX_NAME__");
+    jvmOpts = profilerOptions.getProfilerOptions("", "Map 1", 0);
+    assertTrue(jvmOpts.equals("dir=Map1"));
+
+    profilerOptions = getOptions(conf, "", "dir=__TASK_INDEX__");
+    jvmOpts = profilerOptions.getProfilerOptions("", "Map 1", 0);
+    assertTrue(jvmOpts.equals("dir=0"));
+
+    profilerOptions = getOptions(conf, "v[1,3,4]", "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__");
+    jvmOpts = profilerOptions.getProfilerOptions("", "v", 1);
+    assertTrue(jvmOpts.equals("dir=/tmp/v/1"));
+
+    jvmOpts = profilerOptions.getProfilerOptions("", "v", 3);
+    assertTrue(jvmOpts.equals("dir=/tmp/v/3"));
+
+    jvmOpts = profilerOptions.getProfilerOptions("", "v", 4);
+    assertTrue(jvmOpts.equals("dir=/tmp/v/4"));
+  }
+}


Mime
View raw message