tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3077. TezClient.waitTillReady should support timeout. Contributed by Kuhu Shukla.
Date Mon, 18 Apr 2016 21:52:49 GMT
Repository: tez
Updated Branches:
  refs/heads/master b78a84a0b -> 53aa66117


TEZ-3077. TezClient.waitTillReady should support timeout. Contributed by Kuhu Shukla.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/53aa6611
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/53aa6611
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/53aa6611

Branch: refs/heads/master
Commit: 53aa6611779d03f34ac871d6fdfd640fba57f96d
Parents: b78a84a
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Apr 18 14:52:08 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Apr 18 14:52:08 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/client/TezClient.java   | 91 ++++++++++++++++--
 .../org/apache/tez/dag/api/SessionNotReady.java | 31 +++++++
 .../org/apache/tez/client/TestTezClient.java    | 98 +++++++++++++++++++-
 4 files changed, 209 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3944df2..831e7d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.4: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3077. TezClient.waitTillReady should support timeout.
   TEZ-3202. Reduce the memory need for jobs with high number of segments
   TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs
   TEZ-3214. Tez UI 2: Pagination in All DAGs

http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index e6dd474..f359a26 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -22,9 +22,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.NumberFormat;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
 
@@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.util.Time;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -61,6 +63,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.PreWarmVertex;
+import org.apache.tez.dag.api.SessionNotReady;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -808,25 +811,61 @@ public class TezClient {
    */
   @Unstable
   public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException
{
+    preWarm(preWarmVertex, 0, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * API to help pre-allocate containers in session mode. In non-session mode
+   * this is ignored. The pre-allocated containers may be re-used by subsequent
+   * job DAGs to improve performance.
+   * The preWarm vertex should be configured and setup exactly
+   * like the other vertices in the job DAGs so that the pre-allocated
+   * containers may be re-used by the subsequent DAGs to improve performance.
+   * The processor for the preWarmVertex may be used to pre-warm the containers
+   * by pre-loading classes etc. It should be short-running so that pre-warming
+   * does not block real execution. Users can specify their custom processors or
+   * use the PreWarmProcessor from the runtime library.
+   * The parallelism of the preWarmVertex will determine the number of preWarmed
+   * containers.
+   * Pre-warming is best efforts and among other factors is limited by the free
+   * resources on the cluster. Based on the specified timeout value it returns
+   * false if the status is not READY after the wait period.
+   * @param preWarmVertex
+   * @param timeout
+   * @param unit
+   * @throws TezException
+   * @throws IOException
+   */
+  @Unstable
+  public synchronized void preWarm(PreWarmVertex preWarmVertex,
+      long timeout, TimeUnit unit)
+      throws TezException, IOException {
     if (!isSession) {
-      // do nothing for non session mode. This is there to let the code 
+      // do nothing for non session mode. This is there to let the code
       // work correctly in both modes
-      LOG.warn("preWarm is not supported in non-session mode, please use session-mode of
TezClient");
+      LOG.warn("preWarm is not supported in non-session mode," +
+          "please use session-mode of TezClient");
       return;
     }
-    
+
     verifySessionStateForSubmission();
     
     DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX
+ "_"
         + preWarmDAGCounter++);
     dag.addVertex(preWarmVertex);
 
+    boolean isReady;
     try {
-      waitTillReady();
+      isReady = waitTillReady(timeout, unit);
     } catch (InterruptedException e) {
-      throw new IOException("Interrupted while waiting for AM to become available", e);
+      throw new IOException("Interrupted while waiting for AM to become " +
+          "available", e);
+    }
+    if(isReady) {
+      submitDAG(dag);
+    } else {
+      throw new SessionNotReady("Tez AM not ready, could not submit DAG");
     }
-    submitDAG(dag);
   }
 
   
@@ -841,12 +880,34 @@ public class TezClient {
    */
   @Evolving
   public synchronized void waitTillReady() throws IOException, TezException, InterruptedException
{
+    waitTillReady(0, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Wait till the DAG is ready to be submitted.
+   * In non-session mode this is a no-op since the application can be
+   * immediately submitted.
+   * In session mode, this waits for the session host to be ready to accept
+   * a DAG and returns false if not ready after a configured time wait period.
+   * @param timeout
+   * @param unit
+   * @return true if READY or is not in session mode, false otherwise.
+   * @throws IOException
+   * @throws TezException
+   * @throws InterruptedException
+   */
+  @Evolving
+  public synchronized boolean waitTillReady(long timeout, TimeUnit unit)
+      throws IOException, TezException, InterruptedException {
+    timeout = unit.toMillis(timeout);
     if (!isSession) {
       // nothing to wait for in non-session mode
-      return;
+      return true;
     }
 
     verifySessionStateForSubmission();
+    long startTime = Time.monotonicNow();
+    long timeLimit = startTime + timeout;
     while (true) {
       TezAppMasterStatus status = getAppMasterStatus();
       if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
@@ -854,9 +915,19 @@ public class TezClient {
             + ((diagnostics != null) ? diagnostics : NO_CLUSTER_DIAGNOSTICS_MSG));
       }
       if (status.equals(TezAppMasterStatus.READY)) {
-        return;
+        return true;
+      }
+      if (timeout == 0) {
+        Thread.sleep(SLEEP_FOR_READY);
+        continue;
+      }
+      long now = Time.monotonicNow();
+      if (timeLimit > now) {
+        long sleepTime = Math.min(SLEEP_FOR_READY, timeLimit - now);
+        Thread.sleep(sleepTime);
+      } else {
+        return false;
       }
-      Thread.sleep(SLEEP_FOR_READY);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java b/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java
new file mode 100644
index 0000000..8c06a4d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Exception thrown when the Tez Session is not ready
+ */
+public class SessionNotReady extends TezException {
+
+  private static final long serialVersionUID = -287996170505550317L;
+
+  public SessionNotReady(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/53aa6611/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 583fb79..42b762c 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -25,6 +25,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
@@ -37,9 +39,11 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -49,6 +53,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -66,6 +71,7 @@ import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.SessionNotReady;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConfigurationConstants;
@@ -362,7 +368,95 @@ public class TestTezClient {
 
     client.stop();
   }
-  
+
+  @Test (timeout=30000)
+  public void testPreWarmWithTimeout() throws Exception {
+    long startTime = 0 , endTime = 0;
+    TezClientForTest client = configureAndCreateTezClient();
+    final TezClientForTest spyClient = spy(client);
+    doCallRealMethod().when(spyClient).start();
+    doCallRealMethod().when(spyClient).stop();
+    spyClient.start();
+
+    when(
+        spyClient.mockYarnClient.getApplicationReport(
+            spyClient.mockAppId).getYarnApplicationState())
+        .thenReturn(YarnApplicationState.RUNNING);
+    when(
+        spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
+            (GetAMStatusRequestProto) any()))
+        .thenReturn(
+            GetAMStatusResponseProto.newBuilder().setStatus(
+                TezAppMasterStatusProto.INITIALIZING).build());
+    PreWarmVertex vertex =
+        PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1));
+    int timeout = 5000;
+    try {
+      startTime = Time.monotonicNow();
+      spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS);
+      fail("PreWarm should have encountered an Exception!");
+    } catch (SessionNotReady te) {
+      endTime = Time.monotonicNow();
+      assertTrue("Time taken is not as expected",
+          (endTime - startTime) > timeout);
+      verify(spyClient, times(0)).submitDAG(any(DAG.class));
+      Assert.assertTrue("Unexpected Exception message",
+          te.getMessage().contains("Tez AM not ready"));
+
+    }
+
+    when(
+        spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
+            (GetAMStatusRequestProto) any()))
+        .thenReturn(
+            GetAMStatusResponseProto.newBuilder().setStatus(
+                TezAppMasterStatusProto.READY).build());
+    try {
+      startTime = Time.monotonicNow();
+      spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS);
+      endTime = Time.monotonicNow();
+      assertTrue("Time taken is not as expected",
+          (endTime - startTime) <= timeout);
+      verify(spyClient, times(1)).submitDAG(any(DAG.class));
+    } catch (TezException te) {
+      fail("PreWarm should have succeeded!");
+    }
+    Thread amStateThread = new Thread() {
+      @Override
+      public void run() {
+          CountDownLatch latch = new CountDownLatch(1);
+          try {
+            when(
+                spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
+                    (GetAMStatusRequestProto) any()))
+                .thenReturn(
+                    GetAMStatusResponseProto.newBuilder().setStatus(
+                        TezAppMasterStatusProto.INITIALIZING).build());
+            latch.await(1000, TimeUnit.MILLISECONDS);
+            when(
+                spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
+                    (GetAMStatusRequestProto) any()))
+                .thenReturn(
+                    GetAMStatusResponseProto.newBuilder().setStatus(
+                        TezAppMasterStatusProto.READY).build());
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } catch (ServiceException e) {
+            e.printStackTrace();
+          }
+        }
+      };
+    amStateThread.start();
+    startTime = Time.monotonicNow();
+    spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS);
+    endTime = Time.monotonicNow();
+    assertTrue("Time taken is not as expected",
+        (endTime - startTime) <= timeout);
+    verify(spyClient, times(2)).submitDAG(any(DAG.class));
+    spyClient.stop();
+    client.stop();
+  }
+
   @Test (timeout = 10000)
   public void testMultipleSubmissions() throws Exception {
     testMultipleSubmissionsJob(false);


Mime
View raw message