tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-518. Enforce a timeout on the client when submitting to a tez session. (hitesh)
Date Tue, 01 Oct 2013 20:39:54 GMT
Updated Branches:
  refs/heads/master 04d5e3771 -> ee5d5f87d


TEZ-518. Enforce a timeout on the client when submitting to a tez session. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ee5d5f87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ee5d5f87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ee5d5f87

Branch: refs/heads/master
Commit: ee5d5f87dfb71ba3b74b2708948be80857553f39
Parents: 04d5e37
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Oct 1 13:39:39 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Oct 1 13:39:39 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  5 ++-
 .../java/org/apache/tez/client/TezSession.java  | 46 +++++++++++++++++++-
 .../tez/dag/api/DAGSubmissionTimedOut.java      | 32 ++++++++++++++
 .../apache/tez/dag/api/SessionNotRunning.java   | 41 +++++++++++++++++
 .../apache/tez/dag/api/TezConfiguration.java    | 25 +++++++++++
 5 files changed, 145 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee5d5f87/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8689385..ff07142 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
@@ -512,7 +513,7 @@ public class TezClientUtils {
     return textPath;
   }
 
-  static DAGClientAMProtocolBlockingPB getAMProxy(YarnClient yarnClient,
+  static DAGClientAMProtocolBlockingPB getSessionAMProxy(YarnClient yarnClient,
       Configuration conf,
       ApplicationId applicationId) throws TezException, IOException {
     ApplicationReport appReport;
@@ -529,7 +530,7 @@ public class TezClientUtils {
         if (appState == YarnApplicationState.FINISHED
             || appState == YarnApplicationState.KILLED
             || appState == YarnApplicationState.FAILED) {
-          throw new TezUncheckedException("Application not running"
+          throw new SessionNotRunning("Application not running"
               + ", applicationId=" + applicationId
               + ", yarnApplicationState=" + appReport.getYarnApplicationState()
               + ", finalApplicationStatus="

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee5d5f87/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index dd7fcab..c3a6e75 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -55,6 +57,7 @@ public class TezSession {
   private YarnClient yarnClient;
   private Map<String, LocalResource> tezJarResources;
   private boolean sessionStarted = false;
+  private boolean sessionStopped = false;
 
   public TezSession(String sessionName,
       ApplicationId applicationId,
@@ -69,6 +72,11 @@ public class TezSession {
     this(sessionName, null, sessionConfig);
   }
 
+  /**
+   * Start a Tez Session
+   * @throws TezException
+   * @throws IOException
+   */
   public synchronized void start() throws TezException, IOException {
     yarnClient = YarnClient.createYarnClient();
     yarnClient.init(sessionConfig.getYarnConfiguration());
@@ -99,10 +107,23 @@ public class TezSession {
     sessionStarted = true;
   }
 
+  /**
+   * Submit a DAG to a Tez Session. Blocks until either the DAG is submitted to
+   * the session or configured timeout period expires. Cleans up session if the
+   * submission timed out.
+   * @param dag DAG to be submitted to Session
+   * @return DAGClient to monitor the DAG
+   * @throws TezException
+   * @throws IOException
+   * @throws SessionNotRunning if session is not alive
+   * @throws DAGSubmissionTimedOut if submission timed out
+   */
   public synchronized DAGClient submitDAG(DAG dag)
       throws TezException, IOException {
     if (!sessionStarted) {
       throw new TezUncheckedException("Session not started");
+    } else if (sessionStopped) {
+      throw new TezUncheckedException("Session stopped");
     }
 
     String dagId = null;
@@ -122,9 +143,14 @@ public class TezSession {
         SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
 
     DAGClientAMProtocolBlockingPB proxy;
+    long startTime = System.currentTimeMillis();
+    int timeout = sessionConfig.getTezConfiguration().getInt(
+        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
+        TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
+    long endTime = startTime + (timeout * 1000);
     while (true) {
       // FIXME implement a max time to wait for submit
-      proxy = TezClientUtils.getAMProxy(yarnClient,
+      proxy = TezClientUtils.getSessionAMProxy(yarnClient,
           sessionConfig.getYarnConfiguration(), applicationId);
       if (proxy != null) {
         break;
@@ -134,6 +160,16 @@ public class TezSession {
       } catch (InterruptedException e) {
         // Ignore
       }
+      if (System.currentTimeMillis() > endTime) {
+        try {
+          LOG.warn("DAG submission to session timed out, stopping session");
+          stop();
+        } catch (Throwable t) {
+          LOG.info("Got an exception when trying to stop session", t);
+        }
+        throw new DAGSubmissionTimedOut("Could not submit DAG to Tez Session"
+            + ", timed out after " + timeout + " seconds");
+      }
     }
 
     try {
@@ -149,12 +185,18 @@ public class TezSession {
         sessionConfig.getTezConfiguration());
   }
 
+  /**
+   * Shutdown a Tez Session.
+   * @throws TezException
+   * @throws IOException
+   */
   public synchronized void stop() throws TezException, IOException {
     LOG.info("Shutting down Tez Session"
         + ", sessionName=" + sessionName
         + ", applicationId=" + applicationId);
+    sessionStopped = true;
     try {
-      DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(
+      DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getSessionAMProxy(
           yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
       if (proxy != null) {
         ShutdownSessionRequestProto request =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee5d5f87/tez-api/src/main/java/org/apache/tez/dag/api/DAGSubmissionTimedOut.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAGSubmissionTimedOut.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAGSubmissionTimedOut.java
new file mode 100644
index 0000000..0622ed5
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAGSubmissionTimedOut.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Exception thrown when DAG submission to a Tez Session times out.
+ */
+public class DAGSubmissionTimedOut extends TezException {
+
+  private static final long serialVersionUID = 8521202283261990622L;
+
+  public DAGSubmissionTimedOut(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee5d5f87/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotRunning.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotRunning.java b/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotRunning.java
new file mode 100644
index 0000000..e8a274c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/SessionNotRunning.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Exception thrown when the client cannot communicate with the Tez Session
+ * as the Tez Session is no longer running.
+ */
+public class SessionNotRunning extends TezException {
+
+  private static final long serialVersionUID = -287996170505550316L;
+
+  public SessionNotRunning(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SessionNotRunning(String message) {
+    super(message);
+  }
+
+  public SessionNotRunning(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee5d5f87/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7447974..e807636 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -220,4 +220,29 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_APPLICATION_TYPE = "TEZ-MR*";
 
   public static final String LOCAL_FRAMEWORK_NAME = "local-tez";
+
+  /**
+   * Session-related properties
+   */
+  public static final String TEZ_SESSION_PREFIX =
+      TEZ_PREFIX + "session.";
+
+  /**
+   * Time (in seconds) to wait for AM to come up when trying to submit a DAG
+   * from the client.
+   */
+  public static final String TEZ_SESSION_CLIENT_TIMEOUT_SECS =
+      TEZ_SESSION_PREFIX + "client.timeout.secs";
+  public static final int TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT =
+      120;
+
+  /**
+   * Time (in seconds) for which the Tez AM should wait for a DAG to be submitted before
+   * shutting down.
+   */
+  public static final String TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS =
+      TEZ_SESSION_PREFIX + "am.dag.submit.timeout.secs";
+  public static final int TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT =
+      300;
+
 }


Mime
View raw message