tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] git commit: TEZ-1372. Fix preWarm to work after recent API changes (bikas)
Date Fri, 08 Aug 2014 20:18:25 GMT
Repository: tez
Updated Branches:
  refs/heads/master 5328978f4 -> 805427186


TEZ-1372. Fix preWarm to work after recent API changes (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/67415822
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/67415822
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/67415822

Branch: refs/heads/master
Commit: 674158224b80f3eff1cbf0cb46304813a18634b6
Parents: 5328978
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Aug 8 13:17:53 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Aug 8 13:17:53 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/PreWarmContext.java   | 142 -------------------
 .../java/org/apache/tez/client/TezClient.java   |  86 +++++------
 .../apache/tez/dag/api/DagTypeConverters.java   |  65 ---------
 tez-api/src/main/proto/DAGApiRecords.proto      |  11 --
 .../src/main/proto/DAGClientAMProtocol.proto    |   8 --
 .../org/apache/tez/client/TestTezClient.java    | 100 +++++++------
 .../tez/dag/api/client/DAGClientHandler.java    |  12 --
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  16 ---
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  49 +------
 .../dag/api/client/TestDAGClientHandler.java    |  15 +-
 .../examples/TestOrderedWordCount.java          |  36 ++---
 12 files changed, 109 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 228e682..5faca2f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,6 +50,7 @@ INCOMPATIBLE CHANGES
   TEZ-1382. Change ObjectRegistry API to allow for future extensions
   TEZ-1386. TezGroupedSplitsInputFormat should not need to be setup to enable grouping.
   TEZ-1394. Create example code for OrderedWordCount
+  TEZ-1372. Fix preWarm to work after recent API changes
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java b/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
deleted file mode 100644
index 4b5ab15..0000000
--- a/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.client;
-
-import javax.annotation.Nullable;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.VertexLocationHint;
-
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Context to define how the pre-warm containers should be launched within a
- * session.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class PreWarmContext {
-
-  private final ProcessorDescriptor processorDescriptor;
-  private final Resource resource;
-  private final int numTasks;
-  private final VertexLocationHint locationHints;
-  private Map<String, LocalResource> localResources;
-  private Map<String, String> environment;
-  private String javaOpts;
-
-  /**
-   * Context to define how to pre-warm a TezSession.
-   * 
-   * @param processorDescriptor
-   *          The processor to run within a Tez Task after launching a container
-   * @param resource
-   *          The resource requirements for each container
-   * @param numTasks
-   *          The number of tasks to run. The num of tasks can drive how many
-   *          containers are launched. However, as containers are re-used, the
-   *          total number of launched containers will likely be less than the
-   *          specified number of tasks.
-   * @param locationHints
-   *          The location hints for the containers to be launched.
-   * 
-   */
-  public PreWarmContext(ProcessorDescriptor processorDescriptor,
-      Resource resource,
-      int numTasks,
-      @Nullable VertexLocationHint locationHints) {
-    checkNotNull(processorDescriptor, "processorDescriptor is null");
-    checkNotNull(resource, "resource is null");
-    this.processorDescriptor =  processorDescriptor;
-    this.resource = resource;
-    this.numTasks = numTasks;
-    this.locationHints = locationHints;
-  }
-
-  /**
-   * Set the LocalResources for the pre-warm containers.
-   * @param localResources LocalResources for the container
-   * @return this
-   */
-  public PreWarmContext setLocalResources(
-      Map<String, LocalResource> localResources) {
-    this.localResources = localResources;
-    return this;
-  }
-
-
-  /**
-   * Set the Environment for the pre-warm containers.
-   * @param environment Container environment
-   * @return this
-   */
-  public PreWarmContext setEnvironment(
-      Map<String, String> environment) {
-    this.environment = environment;
-    return this;
-  }
-
-  /**
-   * Set the Java opts for the pre-warm containers.
-   * @param javaOpts Container java opts
-   * @return this
-   */
-  public PreWarmContext setJavaOpts(String javaOpts) {
-    this.javaOpts = javaOpts;
-    return this;
-  }
-
-  public ProcessorDescriptor getProcessorDescriptor() {
-    return processorDescriptor;
-  }
-
-  public Resource getResource() {
-    return resource;
-  }
-
-  public int getNumTasks() {
-    return numTasks;
-  }
-
-  @Nullable
-  public VertexLocationHint getLocationHints() {
-    return locationHints;
-  }
-
-  @Nullable
-  public Map<String, LocalResource> getLocalResources() {
-    return localResources;
-  }
-
-  @Nullable
-  public Map<String, String> getEnvironment() {
-    return environment;
-  }
-
-  @Nullable
-  public String getJavaOpts() {
-    return javaOpts;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 6cbdd6d..7a515fa 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -47,7 +47,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
@@ -98,6 +97,8 @@ public class TezClient {
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
   private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
+  
+  private int preWarmDAGCounter = 0;
 
   /**
    * Create a new TezClient. Session or non-session execution mode will be
@@ -514,59 +515,46 @@ public class TezClient {
     }
     return TezAppMasterStatus.INITIALIZING;
   }
-
+  
   /**
-   * Inform the Session to pre-warm containers for upcoming DAGs.
-   * Can be invoked multiple times on the same session.
-   * A subsequent call will release containers that are not compatible with the
-   * new context.
-   * This function can only be invoked if there is no DAG running on the Session
-   * This function can be a no-op if the Session already holds the required
-   * number of containers.
-   * @param context Context for the pre-warm containers.
+   * API to help pre-allocate containers in session mode. In non-session mode
+   * this is ignored. The pre-allocated containers may be re-used by subsequent 
+   * job DAGs to improve performance. 
+   * The preWarm vertex should be configured and setup exactly
+   * like the other vertices in the job DAGs so that the pre-allocated containers 
+   * may be re-used by the subsequent DAGs to improve performance.
+   * The processor for the preWarmVertex may be used to pre-warm the containers
+   * by pre-loading classes etc. It should be short-running so that pre-warming 
+   * does not block real execution. Users can specify their custom processors or
+   * use the PreWarmProcessor from the runtime library.
+   * The parallelism of the preWarmVertex will determine the number of preWarmed
+   * containers.
+   * Pre-warming is best efforts and among other factors is limited by the free 
+   * resources on the cluster.
+   * @param preWarmVertex
+   * @throws TezException
+   * @throws IOException
+   * @throws InterruptedException
    */
-  @Private
   @InterfaceStability.Unstable
-  public void preWarm(PreWarmContext context)
-      throws IOException, TezException, InterruptedException {
-    verifySessionStateForSubmission();
-
-    try {
-      DAGClientAMProtocolBlockingPB proxy = waitForProxy();
-      if (proxy == null) {
-        throw new SessionNotRunning("Could not connect to Session within client"
-            + " timeout interval, timeoutSecs=" + clientTimeout);
-      }
-
-      Map<String, String> contextEnv = context.getEnvironment();
-      TezYARNUtils.setupDefaultEnv(contextEnv, amConfig.getTezConfiguration(),
-          TezConfiguration.TEZ_TASK_LAUNCH_ENV,
-          TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT,
-          TezClientUtils.usingTezLibsFromArchive(getTezJarResources(sessionCredentials)));
-
-      DAGClientAMProtocolRPC.PreWarmRequestProto.Builder
-        preWarmReqProtoBuilder =
-          DAGClientAMProtocolRPC.PreWarmRequestProto.newBuilder();
-      preWarmReqProtoBuilder.setPreWarmContext(
-        DagTypeConverters.convertPreWarmContextToProto(context));
-      proxy.preWarm(null, preWarmReqProtoBuilder.build());
-      while (true) {
-        try {
-          Thread.sleep(1000);
-          TezAppMasterStatus status = getAppMasterStatus();
-          if (status.equals(TezAppMasterStatus.READY)) {
-            break;
-          } else if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
-            throw new SessionNotRunning("Could not connect to Session");
-          }
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-    } catch (ServiceException e) {
-      throw new TezException(e);
+  public void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException, InterruptedException
{
+    if (!isSession) {
+      // do nothing for non session mode. This is there to let the code 
+      // work correctly in both modes
+      return;
     }
+    
+    verifySessionStateForSubmission();
+    
+    DAG dag = new org.apache.tez.dag.api.DAG(TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX
+ "_"
+        + preWarmDAGCounter++);
+    dag.addVertex(preWarmVertex);
+    
+    waitTillReady();
+    
+    submitDAG(dag);
   }
+
   
   /**
    * Wait till the DAG is ready to be submitted.

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index dbec44d..c81fcb6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUserPayload;
@@ -65,7 +64,6 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
-import org.apache.tez.dag.api.records.DAGProtos.PreWarmContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
@@ -620,69 +618,6 @@ public class DagTypeConverters {
     return builder.build();
   }
 
-  public static PreWarmContextProto convertPreWarmContextToProto(
-      PreWarmContext preWarmContext) {
-    PreWarmContextProto.Builder builder = PreWarmContextProto.newBuilder();
-    builder.setProcessorDescriptor(
-      DagTypeConverters.convertToDAGPlan(
-        preWarmContext.getProcessorDescriptor()));
-    builder.setNumTasks(preWarmContext.getNumTasks());
-    builder.setMemoryMb(preWarmContext.getResource().getMemory());
-    builder.setVirtualCores(preWarmContext.getResource().getVirtualCores());
-    if (preWarmContext.getLocalResources() != null) {
-      builder.setLocalResources(
-        DagTypeConverters.convertFromLocalResources(
-          preWarmContext.getLocalResources()));
-    }
-    if (preWarmContext.getEnvironment() != null) {
-      for (Map.Entry<String, String> entry :
-          preWarmContext.getEnvironment().entrySet()) {
-        builder.addEnvironmentSetting(
-          PlanKeyValuePair.newBuilder()
-            .setKey(entry.getKey())
-            .setValue(entry.getValue())
-            .build());
-      }
-    }
-    if (preWarmContext.getLocationHints() != null) {
-      builder.setLocationHints(
-        DagTypeConverters.convertVertexLocationHintToProto(
-          preWarmContext.getLocationHints()));
-    }
-    if (preWarmContext.getJavaOpts() != null) {
-      builder.setJavaOpts(preWarmContext.getJavaOpts());
-    }
-    return builder.build();
-  }
-
-  public static PreWarmContext convertPreWarmContextFromProto(
-      PreWarmContextProto proto) {
-    VertexLocationHint vertexLocationHint = null;
-    if (proto.hasLocationHints()) {
-      vertexLocationHint =
-          DagTypeConverters.convertVertexLocationHintFromProto(
-              proto.getLocationHints());
-    }
-    PreWarmContext context = new PreWarmContext(
-      DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
-        proto.getProcessorDescriptor()),
-        Resource.newInstance(proto.getMemoryMb(), proto.getVirtualCores()),
-        proto.getNumTasks(),
-        vertexLocationHint);
-    if (proto.hasLocalResources()) {
-      context.setLocalResources(
-        DagTypeConverters.convertFromPlanLocalResources(
-          proto.getLocalResources()));
-    }
-    context.setEnvironment(
-      DagTypeConverters.createEnvironmentMapFromDAGPlan(
-        proto.getEnvironmentSettingList()));
-    if (proto.hasJavaOpts()) {
-      context.setJavaOpts(proto.getJavaOpts());
-    }
-    return context;
-  }
-
   public static TezUserPayload convertToTezUserPayload(@Nullable byte[] payload) {
     return new TezUserPayload(payload);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 13b25c0..1c7aa0b 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -240,14 +240,3 @@ enum StatusGetOptsProto {
 message VertexLocationHintProto {
   repeated PlanTaskLocationHint task_location_hints = 1;
 }
-
-message PreWarmContextProto {
-  optional TezEntityDescriptorProto processor_descriptor = 1;
-  required int32 memoryMb = 2;
-  required int32 virtualCores = 3;
-  optional VertexLocationHintProto location_hints = 4;
-  optional PlanLocalResourcesProto localResources = 5;
-  repeated PlanKeyValuePair environmentSetting = 6;
-  optional string java_opts = 7;
-  optional int32 num_tasks = 8;
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 8d33a19..aa0d938 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -88,13 +88,6 @@ message GetAMStatusResponseProto {
   required TezSessionStatusProto status = 1;
 }
 
-message PreWarmRequestProto {
-  optional PreWarmContextProto pre_warm_context = 1;
-}
-
-message PreWarmResponseProto {
-}
-
 service DAGClientAMProtocol {
   rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
   rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
@@ -103,5 +96,4 @@ service DAGClientAMProtocol {
   rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
   rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
   rpc getAMStatus (GetAMStatusRequestProto) returns (GetAMStatusResponseProto);
-  rpc preWarm (PreWarmRequestProto) returns (PreWarmResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 7b00c28..99a19c6 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -59,13 +60,13 @@ import org.mockito.ArgumentCaptor;
 import com.google.common.collect.Maps;
 import com.google.protobuf.RpcController;
 
-
-
 public class TestTezClient {
 
   class TezClientForTest extends TezClient {
-    TezYarnClient mockYarnClient;
+    TezYarnClient mockTezYarnClient;
     DAGClientAMProtocolBlockingPB sessionAmProxy;
+    YarnClient mockYarnClient;
+    ApplicationId mockAppId;
 
     public TezClientForTest(String name, TezConfiguration tezConf,
         @Nullable Map<String, LocalResource> localResources,
@@ -75,7 +76,7 @@ public class TestTezClient {
     
     @Override
     protected FrameworkClient createFrameworkClient() {
-      return mockYarnClient;
+      return mockTezYarnClient;
     }
     
     @Override
@@ -85,6 +86,19 @@ public class TestTezClient {
     }
   }
   
+  static void configure(TezClientForTest client) throws YarnException, IOException {
+    ApplicationId appId1 = ApplicationId.newInstance(0, 1);
+    YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS);
+    when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1);
+
+    DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class,
RETURNS_DEEP_STUBS);
+
+    client.sessionAmProxy = sessionAmProxy;
+    client.mockTezYarnClient = new TezYarnClient(yarnClient);
+    client.mockYarnClient = yarnClient;
+    client.mockAppId = appId1;
+  }
+  
   @Test
   public void testTezclientApp() throws Exception {
     testTezClient(false);
@@ -107,21 +121,14 @@ public class TestTezClient {
     
     TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
     
-    ApplicationId appId1 = ApplicationId.newInstance(0, 1);
-    YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS);
-    when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1);
-    ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
-
-    DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class,
RETURNS_DEEP_STUBS);
-    
-    client.sessionAmProxy = sessionAmProxy;
-    client.mockYarnClient = new TezYarnClient(yarnClient);
+    configure(client);
     
+    ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
     client.start();
-    verify(yarnClient, times(1)).init((Configuration)any());
-    verify(yarnClient, times(1)).start();
+    verify(client.mockYarnClient, times(1)).init((Configuration)any());
+    verify(client.mockYarnClient, times(1)).start();
     if (isSession) {
-      verify(yarnClient, times(1)).submitApplication(captor.capture());
+      verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size());
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
@@ -131,20 +138,20 @@ public class TestTezClient {
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           lrName1));
     } else {
-      verify(yarnClient, times(0)).submitApplication(captor.capture());
+      verify(client.mockYarnClient, times(0)).submitApplication(captor.capture());
     }
     
     DAG dag = new DAG("DAG").addVertex(
         new Vertex("Vertex", new ProcessorDescriptor("P"), 1, Resource.newInstance(1, 1)));
     DAGClient dagClient = client.submitDAG(dag);
     
-    Assert.assertEquals(appId1, dagClient.getApplicationId());
+    Assert.assertEquals(client.mockAppId, dagClient.getApplicationId());
     
     if (isSession) {
-      verify(yarnClient, times(1)).submitApplication(captor.capture());
-      verify(sessionAmProxy, times(1)).submitDAG((RpcController)any(), (SubmitDAGRequestProto)
any());
+      verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
+      verify(client.sessionAmProxy, times(1)).submitDAG((RpcController)any(), (SubmitDAGRequestProto)
any());
     } else {
-      verify(yarnClient, times(1)).submitApplication(captor.capture());
+      verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size());
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
@@ -165,7 +172,8 @@ public class TestTezClient {
     client.addAppMasterLocalResources(lrs);
     
     ApplicationId appId2 = ApplicationId.newInstance(0, 2);
-    when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId2);
+    when(client.mockYarnClient.createApplication().getNewApplicationResponse().getApplicationId())
+        .thenReturn(appId2);
     
     dag = new DAG("DAG").addVertex(
         new Vertex("Vertex", new ProcessorDescriptor("P"), 1, Resource.newInstance(1, 1)));
@@ -173,18 +181,18 @@ public class TestTezClient {
     
     if (isSession) {
       // same app master
-      verify(yarnClient, times(1)).submitApplication(captor.capture());
-      Assert.assertEquals(appId1, dagClient.getApplicationId());
+      verify(client.mockYarnClient, times(1)).submitApplication(captor.capture());
+      Assert.assertEquals(client.mockAppId, dagClient.getApplicationId());
       // additional resource is sent
       ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
-      verify(sessionAmProxy, times(2)).submitDAG((RpcController)any(), captor1.capture());
+      verify(client.sessionAmProxy, times(2)).submitDAG((RpcController)any(), captor1.capture());
       SubmitDAGRequestProto proto = captor1.getValue();
       Assert.assertEquals(1, proto.getAdditionalAmResources().getLocalResourcesCount());
       Assert.assertEquals(lrName2, proto.getAdditionalAmResources().getLocalResources(0).getName());
     } else {
       // new app master
       Assert.assertEquals(appId2, dagClient.getApplicationId());
-      verify(yarnClient, times(2)).submitApplication(captor.capture());
+      verify(client.mockYarnClient, times(2)).submitApplication(captor.capture());
       // additional resource is added
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size());
@@ -200,36 +208,48 @@ public class TestTezClient {
           lrName2));
     }
     
-    
-    
     client.stop();
     if (isSession) {
-      verify(sessionAmProxy, times(1)).shutdownSession((RpcController) any(), (ShutdownSessionRequestProto)any());
+      verify(client.sessionAmProxy, times(1)).shutdownSession((RpcController) any(),
+          (ShutdownSessionRequestProto) any());
     }
-    verify(yarnClient, times(1)).stop();
+    verify(client.mockYarnClient, times(1)).stop();
   }
   
-  @Test(timeout = 5000)
-  public void testWaitTillReady_Interrupt() throws Exception {
+  public void testPreWarm() throws Exception {
     TezConfiguration conf = new TezConfiguration();
     conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
     Map<String, LocalResource> lrs = Maps.newHashMap();
     final TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
 
-    ApplicationId appId1 = ApplicationId.newInstance(0, 1);
-    YarnClient yarnClient = mock(YarnClient.class, RETURNS_DEEP_STUBS);
-    when(yarnClient.createApplication().getNewApplicationResponse().getApplicationId()).thenReturn(appId1);
-    ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
+    configure(client);
+    client.start();
+    PreWarmVertex vertex = new PreWarmVertex("PreWarm", 1, Resource.newInstance(1, 1));
+    client.preWarm(vertex);
+    
+    ArgumentCaptor<SubmitDAGRequestProto> captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class);
+    verify(client.sessionAmProxy, times(1)).submitDAG((RpcController)any(), (SubmitDAGRequestProto)
any());
+    SubmitDAGRequestProto proto = captor1.getValue();
+    Assert.assertTrue(proto.getDAGPlan().getName().startsWith(TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX));
 
-    DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class,
RETURNS_DEEP_STUBS);
+    client.stop();
+  }
+  
+  @Test(timeout = 5000)
+  public void testWaitTillReady_Interrupt() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    Map<String, LocalResource> lrs = Maps.newHashMap();
+    final TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
 
-    client.sessionAmProxy = sessionAmProxy;
-    client.mockYarnClient = new TezYarnClient(yarnClient);
+    configure(client);
 
     client.start();
 
-    when(yarnClient.getApplicationReport(appId1).getYarnApplicationState()).thenReturn(YarnApplicationState.NEW);
+    when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
+        .thenReturn(YarnApplicationState.NEW);
     final AtomicReference<Exception> exceptionReference = new AtomicReference<Exception>();
     Thread thread = new Thread() {
       @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index 6dca990..32c10aa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -26,15 +26,11 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.event.DAGEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.records.TezDAGID;
 
 public class DAGClientHandler {
@@ -133,12 +129,4 @@ public class DAGClientHandler {
     return TezAppMasterStatus.INITIALIZING;
   }
 
-  public synchronized void preWarmContainers(PreWarmContext preWarmContext)
-      throws TezException {
-    if (dagAppMaster == null) {
-      throw new TezException("DAG App Master is not initialized");
-    }
-    dagAppMaster.startPreWarmContainers(preWarmContext);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index d47bff0..074e67c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -38,8 +38,6 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequ
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.PreWarmRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.PreWarmResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
@@ -158,18 +156,4 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
       throw wrapException(e);
     }
   }
-
-  @Override
-  public DAGClientAMProtocolRPC.PreWarmResponseProto preWarm(
-    RpcController controller,
-    PreWarmRequestProto request) throws ServiceException {
-    try {
-      real.preWarmContainers(
-        DagTypeConverters.convertPreWarmContextFromProto(
-          request.getPreWarmContext()));
-      return PreWarmResponseProto.newBuilder().build();
-    } catch (TezException e) {
-      throw wrapException(e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0626fc8..135b252 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -88,7 +88,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtils;
@@ -962,8 +961,8 @@ public class DAGAppMaster extends AbstractService {
 
     // RPC server runs in the context of the job user as it was started in
     // the job user's UGI context
-    LOG.info("Starting DAG submitted via RPC");
-
+    LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("Invoked with additional local resources: " + additionalResources);
       
@@ -987,50 +986,6 @@ public class DAGAppMaster extends AbstractService {
     return currentDAG.getID().toString();
   }
 
-  public synchronized void startPreWarmContainers(PreWarmContext preWarmContext)
-      throws TezException {
-    // Check if there is a running DAG
-    if(currentDAG != null
-        && !state.equals(DAGAppMasterState.IDLE)) {
-      throw new TezException("App master already running a DAG");
-    }
-
-    // Kill current pre-warm DAG if needed
-    // Launch new pre-warm DAG
-
-    org.apache.tez.dag.api.DAG dag =
-      new org.apache.tez.dag.api.DAG(
-          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX +
-              Integer.toString(dagCounter.get() + 1));
-    if (preWarmContext.getNumTasks() <= 0) {
-      LOG.warn("Ignoring pre-warm context as invalid numContainers specified: "
-          + preWarmContext.getNumTasks());
-      return;
-    }
-    org.apache.tez.dag.api.Vertex preWarmVertex = new
-        org.apache.tez.dag.api.Vertex("PreWarmVertex",
-      preWarmContext.getProcessorDescriptor(),
-      preWarmContext.getNumTasks(), preWarmContext.getResource());
-    if (preWarmContext.getEnvironment() != null) {
-      preWarmVertex.setTaskEnvironment(preWarmContext.getEnvironment());
-    }
-    if (preWarmContext.getLocalResources() != null) {
-      preWarmVertex.setTaskLocalFiles(preWarmContext.getLocalResources());
-    }
-    if (preWarmContext.getLocationHints() != null) {
-      preWarmVertex.setLocationHint(preWarmContext.getLocationHints());
-    }
-    if (preWarmContext.getJavaOpts() != null) {
-      preWarmVertex.setTaskLaunchCmdOpts(preWarmContext.getJavaOpts());
-    }
-    dag.addVertex(preWarmVertex);
-    LOG.info("Pre-warming containers"
-        + ", processor=" + preWarmContext.getProcessorDescriptor().getClassName()
-        + ", numContainers=" + preWarmContext.getNumTasks()
-        + ", containerResource=" + preWarmContext.getResource());
-    startDAG(dag.createDag(amConf), null);
-  }
-
   @SuppressWarnings("unchecked")
   public void tryKillDAG(DAG dag){
     dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 4fb563c..3459fb9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -26,10 +26,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -37,8 +33,6 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.event.DAGEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -48,7 +42,6 @@ import org.mockito.internal.util.collections.Sets;
 public class TestDAGClientHandler {
   
   @Test(timeout = 1000)
-  @SuppressWarnings("unchecked")
   public void testDAGClientHandler() throws TezException {
 
     TezDAGID mockTezDAGId = mock(TezDAGID.class);
@@ -112,13 +105,7 @@ public class TestDAGClientHandler {
     assertEquals(TezAppMasterStatus.INITIALIZING, dagClientHandler.getSessionStatus());
     when(mockDagAM.getState()).thenReturn(DAGAppMasterState.ERROR);
     assertEquals(TezAppMasterStatus.SHUTDOWN, dagClientHandler.getSessionStatus());
-    
-    
-    // startPreWarmContainers
-    PreWarmContext mockPreWarnContext = mock(PreWarmContext.class);
-    dagClientHandler.preWarmContainers(mockPreWarnContext);
-    verify(mockDagAM).startPreWarmContainers(mockPreWarnContext);
-    
+        
     // tryKillDAG
     try{
       dagClientHandler.tryKillDAG("dag_9999_0001_2");

http://git-wip-us.apache.org/repos/asf/tez/blob/67415822/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index baf1b57..b243186 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.PreWarmVertex;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
@@ -78,7 +78,6 @@ import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -413,32 +412,13 @@ public class TestOrderedWordCount extends Configured implements Tool
{
         }
         if (doPreWarm) {
           LOG.info("Pre-warming Session");
-          VertexLocationHint vertexLocationHint =
-              new VertexLocationHint(null);
-          ProcessorDescriptor sleepProcDescriptor =
-            new ProcessorDescriptor(SleepProcessor.class.getName());
-          SleepProcessor.SleepProcessorConfig sleepProcessorConfig =
-            new SleepProcessor.SleepProcessorConfig(4000);
-          sleepProcDescriptor.setUserPayload(
-            sleepProcessorConfig.toUserPayload());
-          PreWarmContext context = new PreWarmContext(sleepProcDescriptor,
-            dag.getVertex("initialmap").getTaskResource(), preWarmNumContainers,
-              vertexLocationHint);
-
-          Map<String, LocalResource> contextLocalRsrcs =
-            new TreeMap<String, LocalResource>();
-          contextLocalRsrcs.putAll(
-            dag.getVertex("initialmap").getTaskLocalFiles());
-          Map<String, String> contextEnv = new TreeMap<String, String>();
-          contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment());
-          String contextJavaOpts =
-            dag.getVertex("initialmap").getTaskLaunchCmdOpts();
-          context
-            .setLocalResources(contextLocalRsrcs)
-            .setJavaOpts(contextJavaOpts)
-            .setEnvironment(contextEnv);
-
-          tezSession.preWarm(context);
+          PreWarmVertex preWarmVertex = new PreWarmVertex("PreWarm", preWarmNumContainers,
dag
+              .getVertex("initialmap").getTaskResource());
+          preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
+          preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
+          preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
+          
+          tezSession.preWarm(preWarmVertex);
         }
 
         if (useTezSession) {


Mime
View raw message