tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [06/29] tez git commit: TEZ-1967. Add a monitoring API on DAGClient which returns after a time interval or on DAG final state change. Contributed by Vasanth kumar RJ.
Date Wed, 04 Mar 2015 20:51:52 GMT
TEZ-1967. Add a monitoring API on DAGClient which returns after a time
interval or on DAG final state change. Contributed by Vasanth kumar RJ.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c8015c4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c8015c4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c8015c4

Branch: refs/heads/TEZ-2003
Commit: 9c8015c49c8d6059e60c7d29c834348cb27bb6df
Parents: 53259aa
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Feb 26 11:03:05 2015 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Feb 26 11:03:05 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  9 ++
 .../apache/tez/dag/api/client/DAGClient.java    | 17 ++++
 .../tez/dag/api/client/DAGClientImpl.java       | 90 ++++++++++++++++++--
 .../dag/api/client/DAGClientTimelineImpl.java   |  6 ++
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 13 ++-
 .../src/main/proto/DAGClientAMProtocol.proto    |  1 +
 .../tez/dag/api/client/rpc/TestDAGClient.java   | 10 +--
 .../tez/dag/api/client/DAGClientHandler.java    |  5 ++
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  3 +-
 .../java/org/apache/tez/dag/app/dag/DAG.java    |  3 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 29 +++++++
 .../apache/tez/dag/api/client/MRDAGClient.java  |  6 ++
 13 files changed, 178 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0aae152..1dfe515 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1967. Add a monitoring API on DAGClient which returns after a time interval or on DAG
final state change.
   TEZ-2130. Send the sessionToken as part of the AM CLC.
   TEZ-1935. Organization should be removed from http://tez.apache.org/team-list.html.
   TEZ-2009. Change license/copyright headers to 2015.

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0bf78f9..8186f2a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1155,4 +1155,13 @@ public class TezConfiguration extends Configuration {
   static Set<String> getPropertySet() {
     return PropertyScope.keySet();
   }
+
+  /**
+   * Long value
+   * Status Poll interval in Milliseconds used when getting DAG status with timeout.
+   */
+  @ConfigurationScope(Scope.DAG)
+  public static final String TEZ_DAG_STATUS_POLLINTERVAL_MS = TEZ_PREFIX
+      + "dag.status.pollinterval-ms";
+  public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index d3de10f..13c8ce6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.tez.dag.api.TezException;
 
@@ -60,6 +61,22 @@ public abstract class DAGClient implements Closeable {
       throws IOException, TezException;
 
   /**
+   * Get the status of the specified DAG when it reaches a final state, or the timeout expires.
+   *
+   * @param statusOptions Optionally, retrieve additional information based on
+   *                      specified options. To retrieve basic information, this can be null
+   * @param timeout RPC call timeout. Value -1 waits for infinite and returns when
+   *                DAG reaches final state
+   * @return DAG Status
+   * @throws IOException
+   * @throws TezException
+   */
+  @Unstable
+  public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout)
+      throws IOException, TezException;
+
+  /**
    * Get the status of a Vertex of a DAG
    * @param statusOptions Optionally, retrieve additional information based on
    *                      specified options

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 87e64cd..b5bb599 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -69,6 +70,7 @@ public class DAGClientImpl extends DAGClient {
   private EnumSet<VertexStatus.State> vertexCompletionStates = EnumSet.of(
       VertexStatus.State.SUCCEEDED, VertexStatus.State.FAILED, VertexStatus.State.KILLED,
       VertexStatus.State.ERROR);
+  private long statusPollInterval;
 
   public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
                        @Nullable FrameworkClient frameworkClient) {
@@ -92,6 +94,13 @@ public class DAGClientImpl extends DAGClient {
     }
 
     realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
+    statusPollInterval = conf.getLong(
+        TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
+        TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT);
+    if(statusPollInterval < 0) {
+      LOG.error("DAG Status poll interval cannot be negative and setting to default value.");
+      statusPollInterval = TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT;
+    }
   }
 
   @Override
@@ -105,14 +114,77 @@ public class DAGClientImpl extends DAGClient {
   }
 
   @Override
-  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
-      TezException, IOException {
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+      final long timeout) throws TezException, IOException {
+    long currentStatusPollInterval = statusPollInterval;
+    if(timeout >= 0 && currentStatusPollInterval > timeout) {
+      currentStatusPollInterval = timeout;
+    }
+    DAGStatus dagStatus = null;
+    if(cachedDagStatus != null) {
+      dagStatus = cachedDagStatus;
+    } else {
+      dagStatus = getDAGStatus(statusOptions);
+    }
+    //Handling when client dag status init or submitted
+    if (dagStatus.getState() == DAGStatus.State.INITING
+        || dagStatus.getState() == DAGStatus.State.SUBMITTED) {
+      boolean initOrSubmittedState = true;
+      long timeoutTime = System.currentTimeMillis() + timeout;
+      while (timeout < 0
+          || (timeout > 0 && timeoutTime > System.currentTimeMillis())) {
+        if (initOrSubmittedState) {
+          dagStatus = getDAGStatus(statusOptions);
+        }
+        if (dagStatus.getState() == DAGStatus.State.RUNNING) {
+          initOrSubmittedState = false;
+          // When RUNNING State, Check for AM status is also RUNNING
+          DAGStatus dagStatusFromAM = getDAGStatusViaAM(statusOptions, 0);
+          if (dagStatusFromAM != null) {
+            if (dagStatusFromAM.getState() == DAGStatus.State.RUNNING) {
+              long remainingTimeout = 0;
+              if (timeout <= 0) {
+                remainingTimeout = timeout;
+              } else {
+                if (timeoutTime > System.currentTimeMillis()) {
+                  remainingTimeout = timeoutTime - System.currentTimeMillis();
+                } else {
+                  return dagStatusFromAM;
+                }
+              }
+              dagStatus = getDAGStatusInternal(statusOptions, remainingTimeout);
+            } else {
+              dagStatus = dagStatusFromAM;
+            }
+            break;
+          }
+        }
+        if(dagStatus.getState() == DAGStatus.State.SUCCEEDED
+            || dagStatus.getState() == DAGStatus.State.FAILED
+            || dagStatus.getState() == DAGStatus.State.KILLED
+            || dagStatus.getState() == DAGStatus.State.ERROR) {
+          break;
+        }
+        try {
+          Thread.sleep(currentStatusPollInterval);
+        } catch (InterruptedException e) {
+          throw new TezException(e);
+        }
+      }// End of while
+      return dagStatus;
+    } else {
+      return getDAGStatusInternal(statusOptions, timeout);
+    }
+  }
+
+  private DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout) throws TezException, IOException {
 
     if (!dagCompleted) {
       // fetch from AM. on Error and while DAG is still not completed (could not reach AM,
AM got
       // killed). return cached status. This prevents the progress being reset (for ex fetching
from
       // RM does not give status).
-      final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+      final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
 
       if (!dagCompleted) {
         if (dagStatus != null) {
@@ -155,6 +227,12 @@ public class DAGClientImpl extends DAGClient {
   }
 
   @Override
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
+      TezException, IOException {
+    return getDAGStatusInternal(statusOptions, 0);
+  }
+
+  @Override
   public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions)
throws
       IOException, TezException {
 
@@ -228,11 +306,11 @@ public class DAGClientImpl extends DAGClient {
     }
   }
 
-  private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions) throws
-      IOException {
+  private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout) throws IOException {
     DAGStatus dagStatus = null;
     try {
-      dagStatus = realClient.getDAGStatus(statusOptions);
+      dagStatus = realClient.getDAGStatus(statusOptions, timeout);
     } catch (DAGNotRunningException e) {
       dagCompleted = true;
     } catch (TezException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index ce88c93..702eead 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -505,4 +505,10 @@ public class DAGClientTimelineImpl extends DAGClient {
     }
   }
 
+  @Override
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout) throws IOException, TezException {
+    return getDAGStatus(statusOptions);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 2ac89d5..74e3b53 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -81,9 +81,16 @@ public class DAGClientRPCImpl extends DAGClient {
   @Override
   public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
       throws IOException, TezException {
+    return getDAGStatus(statusOptions, 0);
+  }
+
+
+  @Override
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout) throws IOException, TezException {
     if(createAMProxyIfNeeded()) {
       try {
-        DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+        DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
         return dagStatus;
       } catch (TezException e) {
         resetProxy(e); // create proxy again
@@ -149,14 +156,14 @@ public class DAGClientRPCImpl extends DAGClient {
     proxy = null;
   }
 
-  DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions)
+  DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout)
       throws IOException, TezException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
     }
     GetDAGStatusRequestProto.Builder requestProtoBuilder =
         GetDAGStatusRequestProto.newBuilder()
-          .setDagId(dagId);
+          .setDagId(dagId).setTimeout(timeout);
 
     if (statusOptions != null) {
       requestProtoBuilder.addAllStatusOptions(

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index aa0d938..143cded 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -35,6 +35,7 @@ message GetAllDAGsResponseProto {
 message GetDAGStatusRequestProto {
   optional string dagId = 1;
   repeated StatusGetOptsProto statusOptions = 3;
+  optional int64 timeout = 4;
 }
 
 message GetDAGStatusResponseProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index bad0d2e..c6894ef 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -207,13 +207,13 @@ public class TestDAGClient {
   public void testDAGStatus() throws Exception{
     DAGStatus resultDagStatus = dagClient.getDAGStatus(null);
     verify(mockProxy, times(1)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).build());
+        .setDagId(dagIdStr).setTimeout(0).build());
     assertEquals(new DAGStatus(dagStatusProtoWithoutCounters), resultDagStatus);
     System.out.println("DAGStatusWithoutCounter:" + resultDagStatus);
     
     resultDagStatus = dagClient.getDAGStatus(Sets.newSet(StatusGetOpts.GET_COUNTERS));
     verify(mockProxy, times(1)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
+        .setDagId(dagIdStr).setTimeout(0).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
     assertEquals(new DAGStatus(dagStatusProtoWithCounters), resultDagStatus);
     System.out.println("DAGStatusWithCounter:" + resultDagStatus);
   }
@@ -252,7 +252,7 @@ public class TestDAGClient {
       
     dagClient.waitForCompletion();
     verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).build());
+        .setDagId(dagIdStr).setTimeout(0).build());
   }
 
   @Test(timeout = 5000)
@@ -271,7 +271,7 @@ public class TestDAGClient {
     //  second & third time for check completion
     dagClient.waitForCompletionWithStatusUpdates(null);
     verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).build());
+        .setDagId(dagIdStr).setTimeout(0).build());
 
     
     when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
@@ -282,7 +282,7 @@ public class TestDAGClient {
              .build());
     dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS));
     verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-      .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
+      .setDagId(dagIdStr).setTimeout(0).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
   }
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index d14ed2a..e40b208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -62,6 +62,11 @@ public class DAGClientHandler {
     return getDAG(dagIdStr).getDAGStatus(statusOptions);
   }
 
+  public DAGStatus getDAGStatus(String dagIdStr,
+      Set<StatusGetOpts> statusOptions, long timeout) throws TezException {
+    return getDAG(dagIdStr).getDAGStatus(statusOptions, timeout);
+  }
+
   public VertexStatus getVertexStatus(String dagIdStr, String vertexName,
       Set<StatusGetOpts> statusOptions) throws TezException {
     VertexStatus status =

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index c054305..4c29b79 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -91,13 +91,14 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
     UserGroupInformation user = getRPCUser();
     try {
       String dagId = request.getDagId();
+      long timeout = request.getTimeout();
       if (!real.getACLManager(dagId).checkDAGViewAccess(user)) {
         throw new AccessControlException("User " + user + " cannot perform DAG view operation");
       }
       DAGStatus status;
       status = real.getDAGStatus(dagId,
         DagTypeConverters.convertStatusGetOptsFromProto(
-          request.getStatusOptionsList()));
+            request.getStatusOptionsList()), timeout);
       assert status instanceof DAGStatusBuilder;
       DAGStatusBuilder builder = (DAGStatusBuilder) status;
       return GetDAGStatusResponseProto.newBuilder().

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 8677015..1b64754 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
@@ -72,6 +73,8 @@ public interface DAG {
 
   DAGPlan getJobPlan();
   DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions);
+  DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions, long timeout)
+      throws TezException;
   VertexStatusBuilder getVertexStatus(String vertexName,
                                       Set<StatusGetOpts> statusOptions);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index dd38c29..d1dcfb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -61,6 +61,7 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.Scope;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -386,6 +387,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   @VisibleForTesting
   boolean recoveryCommitInProgress = false;
   Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
+  long statusPollInterval;
 
   static class VertexGroupInfo {
     String groupName;
@@ -464,6 +466,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
     this.entityUpdateTracker = new StateChangeNotifier(this);
+    statusPollInterval = dagConf.getLong(
+        TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
+        TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT);
+    if(statusPollInterval < 0) {
+      LOG.error("DAG Status poll interval cannot be negative and setting to default value.");
+      statusPollInterval = TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT;
+    }
   }
 
   protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
@@ -730,6 +739,26 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
+  public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions,
+      long timeout) throws TezException {
+    long currentStatusPollInterval = statusPollInterval;
+    if(timeout >= 0 && currentStatusPollInterval > timeout) {
+      currentStatusPollInterval = timeout;
+    }
+    long timeoutTime = System.currentTimeMillis() + timeout;
+    while (timeout < 0 || (timeout > 0 && timeoutTime > System.currentTimeMillis()))
{
+      if(isComplete()) {
+        break;
+      }
+      try {
+        Thread.sleep(currentStatusPollInterval);
+      } catch (InterruptedException e) {
+        throw new TezException(e);
+      }
+    }
+    return getDAGStatus(statusOptions);
+  }
+
   private ProgressBuilder getDAGProgress() {
     int totalTaskCount = 0;
     int totalSucceededTaskCount = 0;

http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
index e05f25c..d743feb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
@@ -85,4 +85,10 @@ public class MRDAGClient extends DAGClient {
   public void close() throws IOException {
     realClient.close();
   }
+
+  @Override
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout) throws IOException, TezException {
+    return getDAGStatus(statusOptions);
+  }
 }


Mime
View raw message