tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-766. Support an api to pre-warm containers for a session. (hitesh)
Date Tue, 04 Feb 2014 21:51:53 GMT
Updated Branches:
  refs/heads/master 55e79c622 -> 4e51cb851


TEZ-766. Support an api to pre-warm containers for a session. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4e51cb85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4e51cb85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4e51cb85

Branch: refs/heads/master
Commit: 4e51cb851e70f7913911360946482f855a93b043
Parents: 55e79c6
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Feb 4 13:51:29 2014 -0800
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Feb 4 13:51:29 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/PreWarmContext.java   | 122 ++++++++++++++++
 .../org/apache/tez/client/TezClientUtils.java   |  17 ++-
 .../java/org/apache/tez/client/TezSession.java  | 146 ++++++++++++++-----
 .../main/java/org/apache/tez/dag/api/DAG.java   |   6 +-
 .../apache/tez/dag/api/DagTypeConverters.java   | 100 +++++++++++++
 .../apache/tez/dag/api/TezConfiguration.java    |  13 --
 tez-api/src/main/proto/DAGApiRecords.proto      |  15 ++
 .../src/main/proto/DAGClientAMProtocol.proto    |   8 +
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  17 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 133 +++++++++--------
 .../mapreduce/examples/OrderedWordCount.java    |  50 ++++++-
 11 files changed, 500 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java b/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
new file mode 100644
index 0000000..39eb20f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+
+import java.util.Map;
+
+/**
+ * Context to define how the pre-warm containers should be launched within a
+ * session.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PreWarmContext {
+
+  private final ProcessorDescriptor processorDescriptor;
+  private final Resource resource;
+  private final VertexLocationHint locationHints;
+  private Map<String, LocalResource> localResources;
+  private Map<String, String> environment;
+  private String javaOpts;
+
+  /**
+   * Context to define how to pre-warm a TezSession.
+   * @param processorDescriptor The processor to run within a Tez Task
+   *                            after launching a container
+   * @param resource The resource requirements for each container
+   * @param locationHints The num of tasks to run as well as the location hints
+   *                      for the containers to be launched.
+   *                      The num of tasks can drive how many containers are launched.
+   *                      However, as containers are re-used, the total number
+   *                      of launched containers will likely be less than the
+   *                      specified number of tasks.
+   */
+  public PreWarmContext(ProcessorDescriptor processorDescriptor,
+      Resource resource,
+      VertexLocationHint locationHints) {
+    this.processorDescriptor =  processorDescriptor;
+    this.resource = resource;
+    this.locationHints = locationHints;
+  }
+
+  /**
+   * Set the LocalResources for the pre-warm containers.
+   * @param localResources LocalResources for the container
+   * @return this
+   */
+  public PreWarmContext setLocalResources(
+      Map<String, LocalResource> localResources) {
+    this.localResources = localResources;
+    return this;
+  }
+
+
+  /**
+   * Set the Environment for the pre-warm containers.
+   * @param environment Container environment
+   * @return this
+   */
+  public PreWarmContext setEnvironment(
+      Map<String, String> environment) {
+    this.environment = environment;
+    return this;
+  }
+
+  /**
+   * Set the Java opts for the pre-warm containers.
+   * @param javaOpts Container java opts
+   * @return this
+   */
+  public PreWarmContext setJavaOpts(String javaOpts) {
+    this.javaOpts = javaOpts;
+    return this;
+  }
+
+  public ProcessorDescriptor getProcessorDescriptor() {
+    return processorDescriptor;
+  }
+
+  public Resource getResource() {
+    return resource;
+  }
+
+  public VertexLocationHint getLocationHints() {
+    return locationHints;
+  }
+
+  public Map<String, LocalResource> getLocalResources() {
+    return localResources;
+  }
+
+  public Map<String, String> getEnvironment() {
+    return environment;
+  }
+
+  public String getJavaOpts() {
+    return javaOpts;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index c6a3c3b..2a8c848 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -371,7 +371,10 @@ public class TezClientUtils {
 
     // Setup the CLASSPATH in environment
     // i.e. add { Hadoop jars, job jar, CWD } to classpath.
-    Map<String, String> environment = createEnvironment(conf);
+    String classpath = getFrameworkClasspath(conf);
+
+    Map<String, String> environment = new TreeMap<String, String>();
+    environment.put(Environment.CLASSPATH.name(), classpath);
 
     if (amConfig.getEnv() != null) {
       for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
@@ -537,14 +540,14 @@ public class TezClientUtils {
     appContext.setAMContainerSpec(amContainer);
     
     appContext.setMaxAppAttempts(
-        finalTezConf.getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 
-            TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
+      finalTezConf.getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
+        TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
 
     return appContext;
 
   }
   
-  static Map<String, String> createEnvironment(Configuration conf) {
+  static String getFrameworkClasspath(Configuration conf) {
     Map<String, String> environment = new HashMap<String, String>();
 
     Apps.addToEnvironment(environment,
@@ -562,7 +565,7 @@ public class TezClientUtils {
       Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
           c.trim());
     }
-    return environment;
+    return environment.get(Environment.CLASSPATH.name());
   }
 
   @VisibleForTesting
@@ -711,7 +714,9 @@ public class TezClientUtils {
           serviceAddr);
       userUgi.addToken(token);
     }
-    LOG.debug("Connecting to " + serviceAddr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to Tez AM at " + serviceAddr);
+    }
     DAGClientAMProtocolBlockingPB proxy = null;
     try {
       proxy = userUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>()
{

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index e452616..62546e9 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -24,13 +24,16 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Apps;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -40,6 +43,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
@@ -63,6 +67,7 @@ public class TezSession {
   private boolean sessionStopped = false;
   /** Tokens which will be required for all DAGs submitted to this session. */
   private Credentials sessionCredentials = new Credentials();
+  private long clientTimeout;
 
   public TezSession(String sessionName,
       ApplicationId applicationId,
@@ -91,6 +96,10 @@ public class TezSession {
         TezClientUtils.setupTezJarsLocalResources(
           sessionConfig.getTezConfiguration(), sessionCredentials);
 
+    clientTimeout = sessionConfig.getTezConfiguration().getInt(
+        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
+        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
+
     if (sessionConfig.getSessionResources() != null
       && !sessionConfig.getSessionResources().isEmpty()) {
       tezJarResources.putAll(sessionConfig.getSessionResources());
@@ -129,64 +138,44 @@ public class TezSession {
    */
   public synchronized DAGClient submitDAG(DAG dag)
     throws TezException, IOException, InterruptedException {
-    if (!sessionStarted) {
-      throw new SessionNotRunning("Session not started");
-    } else if (sessionStopped) {
-      throw new SessionNotRunning("Session stopped");
-    }
+    verifySessionStateForSubmission();
 
     String dagId;
     LOG.info("Submitting dag to TezSession"
-        + ", sessionName=" + sessionName
-        + ", applicationId=" + applicationId);
+      + ", sessionName=" + sessionName
+      + ", applicationId=" + applicationId);
 
     // Obtain DAG specific credentials.
     TezClientUtils.setupDAGCredentials(dag, sessionCredentials, sessionConfig.getTezConfiguration());
 
-    Map<String, String> environment = TezClientUtils
-        .createEnvironment(sessionConfig.getYarnConfiguration());
+    // setup env
+    String classpath = TezClientUtils
+        .getFrameworkClasspath(sessionConfig.getYarnConfiguration());
     for (Vertex v : dag.getVertices()) {
       Map<String, String> taskEnv = v.getTaskEnvironment();
       if (taskEnv == null) {
         taskEnv = Maps.newHashMap();
         v.setTaskEnvironment(taskEnv);
       }
-      for (Map.Entry<String, String> entry : environment.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (!taskEnv.containsKey(key)) {
-          taskEnv.put(key, value);
-        }
-      }
+      Apps.addToEnvironment(taskEnv,
+          ApplicationConstants.Environment.CLASSPATH.name(),
+          classpath);
     }
     
     DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
     SubmitDAGRequestProto requestProto =
         SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
 
-    DAGClientAMProtocolBlockingPB proxy;
-    long startTime = System.currentTimeMillis();
-    int timeout = sessionConfig.getTezConfiguration().getInt(
-        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
-        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
-    long endTime = startTime + (timeout * 1000);
-    while (true) {
-      proxy = TezClientUtils.getSessionAMProxy(yarnClient,
-          sessionConfig.getYarnConfiguration(), applicationId);
-      if (proxy != null) {
-        break;
-      }
-      Thread.sleep(100l);
-      if (timeout != -1 && System.currentTimeMillis() > endTime) {
-        try {
-          LOG.warn("DAG submission to session timed out, stopping session");
-          stop();
-        } catch (Throwable t) {
-          LOG.info("Got an exception when trying to stop session", t);
-        }
-        throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
-            + ", timed out after " + timeout + " seconds");
+    DAGClientAMProtocolBlockingPB proxy = waitForProxy();
+    if (proxy == null) {
+      try {
+        LOG.warn("DAG submission to session timed out, stopping session");
+        stop();
+      } catch (Throwable t) {
+        LOG.info("Got an exception when trying to stop session", t);
       }
+      throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
+          + ", timed out after " + clientTimeout + " seconds");
     }
 
     try {
@@ -287,4 +276,85 @@ public class TezSession {
     return TezSessionStatus.INITIALIZING;
   }
 
+  /**
+   * Inform the Session to pre-warm containers for upcoming DAGs.
+   * Can be invoked multiple times on the same session.
+   * A subsequent call will release containers that are not compatible with the
+   * new context.
+   * This function can only be invoked if there is no DAG running on the Session
+   * This function can be a no-op if the Session already holds the required
+   * number of containers.
+   * @param context Context for the pre-warm containers.
+   */
+  @Private
+  @InterfaceStability.Unstable
+  public void preWarm(PreWarmContext context)
+      throws IOException, TezException, InterruptedException {
+    verifySessionStateForSubmission();
+
+    try {
+      DAGClientAMProtocolBlockingPB proxy = waitForProxy();
+      if (proxy == null) {
+        throw new SessionNotRunning("Could not connect to Session within client"
+            + " timeout interval, timeoutSecs=" + clientTimeout);
+      }
+
+      String classpath = TezClientUtils
+        .getFrameworkClasspath(sessionConfig.getYarnConfiguration());
+      Map<String, String> contextEnv = context.getEnvironment();
+      Apps.addToEnvironment(contextEnv,
+        ApplicationConstants.Environment.CLASSPATH.name(),
+        classpath);
+
+      DAGClientAMProtocolRPC.PreWarmRequestProto.Builder
+        preWarmReqProtoBuilder =
+          DAGClientAMProtocolRPC.PreWarmRequestProto.newBuilder();
+      preWarmReqProtoBuilder.setPreWarmContext(
+        DagTypeConverters.convertPreWarmContextToProto(context));
+      proxy.preWarm(null, preWarmReqProtoBuilder.build());
+      while (true) {
+        try {
+          Thread.sleep(1000);
+          TezSessionStatus status = getSessionStatus();
+          if (status.equals(TezSessionStatus.READY)) {
+            break;
+          } else if (status.equals(TezSessionStatus.SHUTDOWN)) {
+            throw new SessionNotRunning("Could not connect to Session");
+          }
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    } catch (ServiceException e) {
+      throw new TezException(e);
+    }
+  }
+
+  private DAGClientAMProtocolBlockingPB waitForProxy()
+      throws IOException, TezException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long endTime = startTime + (clientTimeout * 1000);
+    DAGClientAMProtocolBlockingPB proxy = null;
+    while (true) {
+      proxy = TezClientUtils.getSessionAMProxy(yarnClient,
+          sessionConfig.getYarnConfiguration(), applicationId);
+      if (proxy != null) {
+        break;
+      }
+      Thread.sleep(100l);
+      if (clientTimeout != -1 && System.currentTimeMillis() > endTime) {
+        break;
+      }
+    }
+    return proxy;
+  }
+
+  private void verifySessionStateForSubmission() throws SessionNotRunning {
+    if (!sessionStarted) {
+      throw new SessionNotRunning("Session not started");
+    } else if (sessionStopped) {
+      throw new SessionNotRunning("Session stopped");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 4e69e1e..6d503f4 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -97,8 +97,8 @@ public class DAG { // FIXME rename to Topology
    * used if the client has already obtained some or all of the required
    * credentials.
    * 
-   * @param credentials
-   * @return
+   * @param credentials Credentials for the DAG
+   * @return this
    */
   public synchronized DAG setCredentials(Credentials credentials) {
     this.credentials = credentials;
@@ -128,7 +128,7 @@ public class DAG { // FIXME rename to Topology
    * can be provided by making multiple calls to the method.
    * 
    * Currently, credentials can only be fetched for HDFS and other
-   * {@link FileSystem} implementations.
+   * {@link org.apache.hadoop.fs.FileSystem} implementations.
    * 
    * @param uris
    *          a list of {@link URI}s

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 0a4668f..ed606c2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
@@ -61,11 +62,13 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PreWarmContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString.Output;
@@ -539,4 +542,101 @@ public class DagTypeConverters {
     }
   }
 
+  public static VertexLocationHint convertVertexLocationHintFromProto(
+    VertexLocationHintProto proto) {
+    List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>(
+      proto.getTaskLocationHintsCount());
+    for(PlanTaskLocationHint inputHint : proto.getTaskLocationHintsList()){
+      TaskLocationHint outputHint = new TaskLocationHint(
+        new HashSet<String>(inputHint.getHostList()),
+        new HashSet<String>(inputHint.getRackList()));
+      outputList.add(outputHint);
+    }
+
+    return new VertexLocationHint(proto.getNumTasks(), outputList);
+  }
+
+  public static VertexLocationHintProto convertVertexLocationHintToProto(
+      VertexLocationHint vertexLocationHint) {
+    VertexLocationHintProto.Builder builder =
+      VertexLocationHintProto.newBuilder();
+    builder.setNumTasks(vertexLocationHint.getNumTasks());
+    if (vertexLocationHint.getTaskLocationHints() != null) {
+      for (TaskLocationHint taskLocationHint :
+        vertexLocationHint.getTaskLocationHints()) {
+        PlanTaskLocationHint.Builder taskLHBuilder =
+          PlanTaskLocationHint.newBuilder();
+        if (taskLocationHint.getDataLocalHosts() != null) {
+          taskLHBuilder.addAllHost(taskLocationHint.getDataLocalHosts());
+        }
+        if (taskLocationHint.getRacks() != null) {
+          taskLHBuilder.addAllRack(taskLocationHint.getRacks());
+        }
+        builder.addTaskLocationHints(taskLHBuilder.build());
+      }
+    }
+    return builder.build();
+  }
+
+  public static PreWarmContextProto convertPreWarmContextToProto(
+      PreWarmContext preWarmContext) {
+    PreWarmContextProto.Builder builder = PreWarmContextProto.newBuilder();
+    builder.setProcessorDescriptor(
+      DagTypeConverters.convertToDAGPlan(
+        preWarmContext.getProcessorDescriptor()));
+    builder.setMemoryMb(preWarmContext.getResource().getMemory());
+    builder.setVirtualCores(preWarmContext.getResource().getVirtualCores());
+    if (preWarmContext.getLocalResources() != null) {
+      builder.setLocalResources(
+        DagTypeConverters.convertFromLocalResources(
+          preWarmContext.getLocalResources()));
+    }
+    if (preWarmContext.getEnvironment() != null) {
+      for (Map.Entry<String, String> entry :
+          preWarmContext.getEnvironment().entrySet()) {
+        builder.addEnvironmentSetting(
+          PlanKeyValuePair.newBuilder()
+            .setKey(entry.getKey())
+            .setValue(entry.getValue())
+            .build());
+      }
+    }
+    if (preWarmContext.getLocationHints() != null) {
+      builder.setLocationHints(
+        DagTypeConverters.convertVertexLocationHintToProto(
+          preWarmContext.getLocationHints()));
+    }
+    if (preWarmContext.getJavaOpts() != null) {
+      builder.setJavaOpts(preWarmContext.getJavaOpts());
+    }
+    return builder.build();
+  }
+
+  public static PreWarmContext convertPreWarmContextFromProto(
+      PreWarmContextProto proto) {
+    VertexLocationHint vertexLocationHint = null;
+    if (proto.hasLocationHints()) {
+      vertexLocationHint =
+          DagTypeConverters.convertVertexLocationHintFromProto(
+              proto.getLocationHints());
+    }
+    PreWarmContext context = new PreWarmContext(
+      DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
+        proto.getProcessorDescriptor()),
+        Resource.newInstance(proto.getMemoryMb(), proto.getVirtualCores()),
+        vertexLocationHint);
+    if (proto.hasLocalResources()) {
+      context.setLocalResources(
+        DagTypeConverters.convertFromPlanLocalResources(
+          proto.getLocalResources()));
+    }
+    context.setEnvironment(
+      DagTypeConverters.createEnvironmentMapFromDAGPlan(
+        proto.getEnvironmentSettingList()));
+    if (proto.hasJavaOpts()) {
+      context.setJavaOpts(proto.getJavaOpts());
+    }
+    return context;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6f646b5..2a3cde9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -277,19 +277,6 @@ public class TezConfiguration extends Configuration {
       300;
 
   /**
-   * Session pre-warm related configuration options
-   */
-
-  public static final String TEZ_SESSION_PRE_WARM_PREFIX =
-    TEZ_SESSION_PREFIX + "pre-warm.";
-  public static final String TEZ_SESSION_PRE_WARM_ENABLED =
-    TEZ_SESSION_PRE_WARM_PREFIX + "enabled";
-  public static final boolean TEZ_SESSION_PRE_WARM_ENABLED_DEFAULT = false;
-
-  public static final String TEZ_PRE_WARM_PB_PLAN_BINARY_PATH =
-      TEZ_SESSION_PRE_WARM_PREFIX + "dag-plan.pb.path";
-
-  /**
    * The queue name for all jobs being submitted as part of a session, or for
    * non session jobs.
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index e658763..b7a2c60 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -234,3 +234,18 @@ message TezCountersProto {
 enum StatusGetOptsProto {
   GET_COUNTERS = 0;
 }
+
+message VertexLocationHintProto {
+  optional int32 num_tasks = 1;
+  repeated PlanTaskLocationHint task_location_hints = 2;
+}
+
+message PreWarmContextProto {
+  optional TezEntityDescriptorProto processor_descriptor = 1;
+  required int32 memoryMb = 2;
+  required int32 virtualCores = 3;
+  optional VertexLocationHintProto location_hints = 4;
+  optional PlanLocalResourcesProto localResources = 5;
+  repeated PlanKeyValuePair environmentSetting = 6;
+  optional string java_opts = 7;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 0f29364..525679e 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -87,6 +87,13 @@ message GetAMStatusResponseProto {
   required TezSessionStatusProto status = 1;
 }
 
+message PreWarmRequestProto {
+  optional PreWarmContextProto pre_warm_context = 1;
+}
+
+message PreWarmResponseProto {
+}
+
 service DAGClientAMProtocol {
   rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
   rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
@@ -95,4 +102,5 @@ service DAGClientAMProtocol {
   rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
   rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
   rpc getAMStatus (GetAMStatusRequestProto) returns (GetAMStatusResponseProto);
+  rpc preWarm (PreWarmRequestProto) returns (PreWarmResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 959fbbc..cca0827 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -27,7 +27,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsRequestProto;
@@ -36,6 +35,8 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequ
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.PreWarmRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.PreWarmResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
@@ -152,4 +153,18 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
     }
   }
 
+  @Override
+  public DAGClientAMProtocolRPC.PreWarmResponseProto preWarm(
+    RpcController controller,
+    PreWarmRequestProto request) throws ServiceException {
+    try {
+      real.preWarmContainers(
+        DagTypeConverters.convertPreWarmContextFromProto(
+          request.getPreWarmContext()));
+      return PreWarmResponseProto.newBuilder().build();
+    } catch (TezException e) {
+      throw wrapException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8d70a05..0210a9a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -21,7 +21,6 @@ package org.apache.tez.dag.app;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.PrintWriter;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -47,8 +46,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
@@ -77,6 +74,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -394,14 +392,20 @@ public class DAGAppMaster extends AbstractService {
         lastDAGCompletionTime = clock.getTime();
         switch(finishEvt.getDAGState()) {
         case SUCCEEDED:
-          successfulDAGs.incrementAndGet();
+          if (!currentDAG.getName().equals("PreWarmDAG")) {
+            successfulDAGs.incrementAndGet();
+          }
           break;
         case ERROR:
         case FAILED:
-          failedDAGs.incrementAndGet();
+          if (!currentDAG.getName().equals("PreWarmDAG")) {
+            failedDAGs.incrementAndGet();
+          }
           break;
         case KILLED:
-          killedDAGs.incrementAndGet();
+          if (!currentDAG.getName().equals("PreWarmDAG")) {
+            killedDAGs.incrementAndGet();
+          }
           break;
         default:
           LOG.fatal("Received a DAG Finished Event with state="
@@ -804,6 +808,49 @@ public class DAGAppMaster extends AbstractService {
     return currentDAG.getID().toString();
   }
 
+  synchronized void startPreWarmContainers(PreWarmContext preWarmContext)
+      throws TezException {
+    // Check if there is a running DAG
+    if(currentDAG != null
+        && !state.equals(DAGAppMasterState.IDLE)) {
+      throw new TezException("App master already running a DAG");
+    }
+
+    // Kill current pre-warm DAG if needed
+    // Launch new pre-warm DAG
+
+    org.apache.tez.dag.api.DAG dag =
+      new org.apache.tez.dag.api.DAG("PreWarmDAG");
+    if (preWarmContext.getLocationHints().getNumTasks() <= 0) {
+      LOG.warn("Ignoring pre-warm context as invalid numContainers specified: "
+          + preWarmContext.getLocationHints().getNumTasks());
+      return;
+    }
+    org.apache.tez.dag.api.Vertex preWarmVertex = new
+        org.apache.tez.dag.api.Vertex("PreWarmVertex",
+      preWarmContext.getProcessorDescriptor(),
+      preWarmContext.getLocationHints().getNumTasks(), preWarmContext.getResource());
+    if (preWarmContext.getEnvironment() != null) {
+      preWarmVertex.setTaskEnvironment(preWarmContext.getEnvironment());
+    }
+    if (preWarmContext.getLocalResources() != null) {
+      preWarmVertex.setTaskLocalResources(preWarmContext.getLocalResources());
+    }
+    if (preWarmContext.getLocationHints() != null) {
+      preWarmVertex.setTaskLocationsHint(
+        preWarmContext.getLocationHints().getTaskLocationHints());
+    }
+    if (preWarmContext.getJavaOpts() != null) {
+      preWarmVertex.setJavaOpts(preWarmContext.getJavaOpts());
+    }
+    dag.addVertex(preWarmVertex);
+    LOG.info("Pre-warming containers"
+        + ", processor=" + preWarmContext.getProcessorDescriptor().getClassName()
+        + ", numContainers=" + preWarmContext.getLocationHints().getNumTasks()
+        + ", containerResource=" + preWarmContext.getResource());
+    startDAG(dag.createDag(amConf));
+  }
+
   public class DAGClientHandler {
 
     public List<String> getAllDAGs() throws TezException {
@@ -886,6 +933,12 @@ public class DAGAppMaster extends AbstractService {
       }
       return TezSessionStatus.INITIALIZING;
     }
+
+    public synchronized void preWarmContainers(PreWarmContext preWarmContext)
+        throws TezException {
+      startPreWarmContainers(preWarmContext);
+    }
+
   }
 
   private class RunningAppContext implements AppContext {
@@ -1214,65 +1267,17 @@ public class DAGAppMaster extends AbstractService {
     if (!isSession) {
       startDAG();
     } else {
-      boolean preWarmContainersEnabled = amConf.getBoolean(
-        TezConfiguration.TEZ_SESSION_PRE_WARM_ENABLED,
-        TezConfiguration.TEZ_SESSION_PRE_WARM_ENABLED_DEFAULT);
-
-      boolean ranPreWarmContainersDAG = false;
-      if (preWarmContainersEnabled) {
-        ranPreWarmContainersDAG = runPreWarmContainersDAG();
-      }
-
-      if (!ranPreWarmContainersDAG) {
-        LOG.info("In Session mode. Waiting for DAG over RPC");
-        this.state = DAGAppMasterState.IDLE;
-
-        this.dagSubmissionTimer = new Timer(true);
-        this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
-          @Override
-          public void run() {
-            checkAndHandleSessionTimeout();
-          }
-        }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
-      }
-    }
-  }
-
-  private boolean runPreWarmContainersDAG() throws Exception {
-
-    InputStream dagPBBinaryStream = null;
-    try {
-      DAGPlan preWarmDAGPlan = null;
-      String preWarmDAGPlanPathStr =
-        amConf.get(TezConfiguration.TEZ_PRE_WARM_PB_PLAN_BINARY_PATH);
-      if (preWarmDAGPlanPathStr == null
-            || preWarmDAGPlanPathStr.isEmpty()) {
-        LOG.info("No path to pre-warm DAG plan specified");
-        return false;
-      }
-
-      LOG.info("Trying to run pre-warm DAG plan from specified path: "
-          + preWarmDAGPlanPathStr);
-
-      FileSystem fs = FileSystem.get(amConf);
-      Path preWarmDAGPlanPath = new Path(preWarmDAGPlanPathStr);
-      if (!fs.exists(preWarmDAGPlanPath)) {
-        LOG.info("Could not find pre-warm DAG plan file, path="
-          + preWarmDAGPlanPathStr);
-        return false;
-      }
-
-      // Read the protobuf-based pre-warm DAG plan
-      dagPBBinaryStream = fs.open(preWarmDAGPlanPath);
-      preWarmDAGPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-      startDAG(preWarmDAGPlan);
-
-    } finally {
-      if (dagPBBinaryStream != null) {
-        dagPBBinaryStream.close();
-      }
+      LOG.info("In Session mode. Waiting for DAG over RPC");
+      this.state = DAGAppMasterState.IDLE;
+
+      this.dagSubmissionTimer = new Timer(true);
+      this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
+        @Override
+        public void run() {
+          checkAndHandleSessionTimeout();
+        }
+      }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
     }
-    return true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e51cb85/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index f6fe7dd..7afc667 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.mapreduce.examples;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -49,9 +50,13 @@ import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
 import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
@@ -69,6 +74,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -83,6 +89,7 @@ import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
 
 /**
  * An MRR job built on top of word count to return words sorted by
@@ -216,8 +223,8 @@ public class OrderedWordCount {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, 
-            TextInputFormat.class.getName());
+    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
+      TextInputFormat.class.getName());
     int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapPayload),
@@ -408,6 +415,45 @@ public class OrderedWordCount {
             stagingDir, dagIndex, inputPath, outputPath,
             generateSplitsInClient);
 
+        boolean doPreWarm = dagIndex == 1 && useTezSession
+            && conf.getBoolean("PRE_WARM_SESSION", true);
+        int preWarmNumContainers = 0;
+        if (doPreWarm) {
+          preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0);
+          if (preWarmNumContainers <= 0) {
+            doPreWarm = false;
+          }
+        }
+        if (doPreWarm) {
+          LOG.info("Pre-warming Session");
+          VertexLocationHint vertexLocationHint =
+              new VertexLocationHint(preWarmNumContainers, null);
+          ProcessorDescriptor sleepProcDescriptor =
+            new ProcessorDescriptor(SleepProcessor.class.getName());
+          SleepProcessor.SleepProcessorConfig sleepProcessorConfig =
+            new SleepProcessor.SleepProcessorConfig(4000);
+          sleepProcDescriptor.setUserPayload(
+            sleepProcessorConfig.toUserPayload());
+          PreWarmContext context = new PreWarmContext(sleepProcDescriptor,
+            dag.getVertex("initialmap").getTaskResource(),
+              vertexLocationHint);
+
+          Map<String, LocalResource> contextLocalRsrcs =
+            new TreeMap<String, LocalResource>();
+          contextLocalRsrcs.putAll(
+            dag.getVertex("initialmap").getTaskLocalResources());
+          Map<String, String> contextEnv = new TreeMap<String, String>();
+          contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment());
+          String contextJavaOpts =
+            dag.getVertex("initialmap").getJavaOpts();
+          context
+            .setLocalResources(contextLocalRsrcs)
+            .setJavaOpts(contextJavaOpts)
+            .setEnvironment(contextEnv);
+
+          tezSession.preWarm(context);
+        }
+
         if (useTezSession) {
           LOG.info("Waiting for TezSession to get into ready state");
           waitForTezSessionReady(tezSession);


Mime
View raw message