tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-211. TezClient.submit should return a DAGClient instead of an AppId (bikas)
Date Thu, 13 Jun 2013 21:32:17 GMT
Updated Branches:
  refs/heads/master 622314a81 -> f6554fe8d


TEZ-211. TezClient.submit should return a DAGClient instead of an AppId (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f6554fe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f6554fe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f6554fe8

Branch: refs/heads/master
Commit: f6554fe8da0f7d01df464f739770f57e4407677b
Parents: 622314a
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jun 13 14:29:19 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jun 13 14:29:19 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/records/TezDAGID.java    |  44 ++--
 tez-dag-api/pom.xml                             |  11 -
 .../java/org/apache/tez/client/TezClient.java   | 130 ++++-------
 .../apache/tez/dag/api/client/DAGClient.java    |  21 +-
 .../apache/tez/dag/api/client/DAGStatus.java    |  18 ++
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 214 +++++++++++++++----
 .../tez/dag/api/client/DAGClientServer.java     |   5 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |  13 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  12 +-
 .../dag/app/dag/impl/TestVertexScheduler.java   |  71 +++++-
 .../examples/GroupByOrderByMRRTest.java         | 110 ++++------
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  15 +-
 .../tez/mapreduce/ClientServiceDelegate.java    |  73 +------
 .../org/apache/tez/mapreduce/DAGJobStatus.java  |  64 +++---
 .../org/apache/tez/mapreduce/YARNRunner.java    |  39 ++--
 15 files changed, 461 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index 9a0a409..eff9764 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -37,14 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
  * @see ApplicationId
  */
 public class TezDAGID extends TezID {
-
-  public static final String DAG = "dag";
-  protected static final NumberFormat idFormat = NumberFormat.getInstance();
-  static {
-    idFormat.setGroupingUsed(false);
-    idFormat.setMinimumIntegerDigits(6);
-  }
-
+  
   private ApplicationId applicationId;
 
   public TezDAGID() {
@@ -95,6 +88,28 @@ public class TezDAGID extends TezID {
     return this.applicationId.compareTo(that.applicationId);
   }
 
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    applicationId = ApplicationId.newInstance(in.readLong(), in.readInt());
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(applicationId.getClusterTimestamp());
+    out.writeInt(applicationId.getId());
+    super.write(out);
+  }
+
+  // DO NOT CHANGE THIS. DAGClient replicates this code to create DAG id string
+  public static final String DAG = "dag";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+
   @Override
   public String toString() {
     return appendTo(new StringBuilder(DAG)).toString();
@@ -114,19 +129,6 @@ public class TezDAGID extends TezID {
     return null;
   }
 
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    applicationId = ApplicationId.newInstance(in.readLong(), in.readInt());
-    super.readFields(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(applicationId.getClusterTimestamp());
-    out.writeInt(applicationId.getId());
-    super.write(out);
-  }
-
   /**
    * Add the unique string to the given builder.
    * @param builder the builder to append to

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag-api/pom.xml b/tez-dag-api/pom.xml
index 2a91b4d..12f04de 100644
--- a/tez-dag-api/pom.xml
+++ b/tez-dag-api/pom.xml
@@ -50,17 +50,6 @@
         <configuration>
         </configuration>
       </plugin>
-    <plugin>
-    <groupId>org.apache.maven.plugins</groupId>
-     <artifactId>maven-jar-plugin</artifactId>
-      <configuration>
-       <archive>
-         <manifest>
-           <mainClass>org.apache.tez.client.TezClient</mainClass>
-         </manifest>
-       </archive>
-     </configuration>
-    </plugin>
       <plugin>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-maven-plugins</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index ff8c486..b3fe4f3 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -19,8 +19,8 @@
 package org.apache.tez.client;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +30,7 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,14 +44,12 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.YarnClient;
 import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -60,10 +59,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
@@ -74,7 +71,7 @@ public class TezClient {
       FsPermission.createImmutable((short) 0700); // rwx--------
   final public static FsPermission TEZ_AM_FILE_PERMISSION = 
       FsPermission.createImmutable((short) 0644); // rw-r--r--
-  
+    
   private final TezConfiguration conf;
   private YarnClient yarnClient;
   
@@ -84,44 +81,6 @@ public class TezClient {
     yarnClient.init(conf);
     yarnClient.start();
   }
-
-  /**
-   * Returns a <code>DAGClient</code> for the currently running attempt of a
-   * Tez DAG application
-   * @param appIdStr The application id of the app.
-   * @return DAGClient if the app is running. null otherwise. 
-   * @throws IOException
-   * @throws TezException
-   */
-  public DAGClient getDAGClient(String appIdStr) throws IOException, TezUncheckedException {
-    try {
-      ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
-        return null;
-      }
-      String host = appReport.getHost();
-      int port = appReport.getRpcPort();
-      return getDAGClient(host, port);
-    } catch (YarnException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-  
-  /**
-   * Returns a <code>DAGClient</code> for a Tez application listening at the 
-   * given host and port  
-   * @param host
-   * @param port
-   * @return
-   * @throws IOException
-   */
-  public DAGClient getDAGClient(String host, int port) throws IOException {
-    InetSocketAddress addr = new InetSocketAddress(host, port);
-    DAGClient dagClient;
-    dagClient = new DAGClientRPCImpl(1, addr, conf);
-    return dagClient;    
-  }
   
   /**
    * Submit a Tez DAG to YARN as an application
@@ -137,14 +96,13 @@ public class TezClient {
    * @throws IOException
    * @throws YarnException
    */
-  public ApplicationId submitDAGApplication(DAG dag, Path appStagingDir,
+  public DAGClient submitDAGApplication(DAG dag, Path appStagingDir,
       Credentials ts, String amQueueName, List<String> amArgs,
       Map<String, String> amEnv, Map<String, LocalResource> amLocalResources)
-      throws IOException, YarnException {
+      throws IOException, TezException {
     ApplicationId appId = createApplication();
-    submitDAGApplication(appId, dag, appStagingDir, ts, amQueueName,
+    return submitDAGApplication(appId, dag, appStagingDir, ts, amQueueName,
         amArgs, amEnv, amLocalResources);
-    return appId;
   }
 
   /**
@@ -162,16 +120,21 @@ public class TezClient {
    * @throws IOException
    * @throws YarnException
    */
-  public void submitDAGApplication(ApplicationId appId, DAG dag,
+  public DAGClient submitDAGApplication(ApplicationId appId, DAG dag,
       Path appStagingDir, Credentials ts, String amQueueName,
       List<String> amArgs, Map<String, String> amEnv,
       Map<String, LocalResource> amLocalResources) throws IOException,
-      YarnException {
-    ApplicationSubmissionContext appContext = createApplicationSubmissionContext(
-        appId, dag, appStagingDir, ts, amQueueName, dag.getName(), amArgs, amEnv,
-        amLocalResources);
+      TezException {
+    try {
+      ApplicationSubmissionContext appContext = createApplicationSubmissionContext(
+          appId, dag, appStagingDir, ts, amQueueName, dag.getName(), amArgs,
+          amEnv, amLocalResources);
+      yarnClient.submitApplication(appContext);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
 
-    yarnClient.submitApplication(appContext);
+    return getDAGClient(appId);
   }
   
   /**
@@ -180,10 +143,20 @@ public class TezClient {
    * @throws YarnException
    * @throws IOException
    */
-  public ApplicationId createApplication() throws YarnException, IOException {
-    return yarnClient.getNewApplication().getApplicationId();
+  public ApplicationId createApplication() throws TezException, IOException {
+    try {
+      return yarnClient.getNewApplication().getApplicationId();
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
   }
-  
+
+  @Private
+  public DAGClient getDAGClient(ApplicationId appId) 
+      throws IOException, TezException {
+      return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), conf);
+  }
+
   private void addLog4jSystemProperties(String logLevel,
       List<String> vargs) {
     vargs.add("-Dlog4j.configuration=container-log4j.properties");
@@ -365,31 +338,22 @@ public class TezClient {
 
     return appContext;
   }
-
   
-  public static void main(String[] args) {
-    try {
-      TezClient tezClient = new TezClient(
-          new TezConfiguration(new YarnConfiguration()));
-      DAGClient dagClient = tezClient.getDAGClient(args[1]);
-      String dagId = dagClient.getAllDAGs().get(0);
-      DAGStatus dagStatus = dagClient.getDAGStatus(dagId);
-      System.out.println("DAG: " + dagId + 
-                         " State: " + dagStatus.getState() +
-                         " Progress: " + dagStatus.getDAGProgress());
-      for(String vertexName : dagStatus.getVertexProgress().keySet()) {
-        System.out.println("VertexStatus from DagStatus:" +
-                           " Vertex: " + vertexName +
-                           " Progress: " + dagStatus.getVertexProgress().get(vertexName));
-        VertexStatus vertexStatus = dagClient.getVertexStatus(dagId, vertexName);
-        System.out.println("VertexStatus:" + 
-                           " Vertex: " + vertexName + 
-                           " State: " + vertexStatus.getState() + 
-                           " Progress: " + vertexStatus.getProgress());
-      }
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+  // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
+  private static final char SEPARATOR = '_';
+  private static final String DAG = "dag";
+  private static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+  
+  String getDefaultTezDAGID(ApplicationId appId) {
+     return (new StringBuilder(DAG)).append(SEPARATOR).
+                   append(appId.getClusterTimestamp()).
+                   append(SEPARATOR).
+                   append(appId.getId()).
+                   append(SEPARATOR).
+                   append(idFormat.format(1)).toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 36c8f66..c62d651 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -18,31 +18,28 @@
 
 package org.apache.tez.dag.api.client;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.api.TezException;
 
 /*
- * Inteface class for monitoring the <code>DAG</code> running in a Tez DAG
+ * Interface class for monitoring the <code>DAG</code> running in a Tez DAG
  * Application Master.
  */
-public interface DAGClient {
+public interface DAGClient extends Closeable {
   
-  /**
-   * Return the identifiers for all DAG's
-   */
-  List<String> getAllDAGs() throws IOException, TezException;
+  public ApplicationId getApplicationId();
   
   /**
-   * Get the status of a DAG
+   * Get the status of the specified DAG
    */
-  DAGStatus getDAGStatus(String dagId) throws IOException, TezException;
+  public DAGStatus getDAGStatus() throws IOException, TezException;
   
   /**
    * Get the status of a Vertex of a DAG 
    */
-  VertexStatus getVertexStatus(String dagId, 
-                               String vertexName) 
-                                   throws IOException, TezException;
+  public VertexStatus getVertexStatus(String vertexName)
+      throws IOException, TezException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 83b34a5..c2c0b88 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -72,6 +72,15 @@ public class DAGStatus {
     return proxy.getDiagnosticsList();
   }
 
+  /**
+   * Gets overall progress value of the DAG.
+   * 
+   * @return Progress of the DAG. Maybe null when the DAG is not running. Maybe
+   *         null when the DAG is running and the application master cannot be
+   *         reached - e.g. when the execution platform has restarted the
+   *         application master.
+   * @see Progress
+   */
   public Progress getDAGProgress() {
     if(progress == null && proxy.hasDAGProgress()) {
       progress = new Progress(proxy.getDAGProgress());
@@ -79,6 +88,15 @@ public class DAGStatus {
     return progress;
   }
 
+  /**
+   * Get the progress of a vertex in the DAG
+   * 
+   * @return Progress of the vertex. May be null when the DAG is not running.
+   *         Maybe null when the DAG is running and the application master
+   *         cannot be reached - e.g. when the execution platform has restarted
+   *         the application master.
+   * @see Progress
+   */
   public Map<String, Progress> getVertexProgress() {
     if(vertexProgress == null) {
       if(proxy.getVertexProgressList() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 989906d..db30111 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -18,72 +18,178 @@
 
 package org.apache.tez.dag.api.client.rpc;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.List;
+import java.util.Collections;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
 
 import com.google.protobuf.ServiceException;
 
-public class DAGClientRPCImpl implements DAGClient, Closeable {
+public class DAGClientRPCImpl implements DAGClient {
+  private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
 
+  private ApplicationId appId;
+  private String dagId;
+  private TezConfiguration conf;
+  private YarnApplicationState appState;
+  private YarnClient yarnClient;
   private DAGClientAMProtocolBlockingPB proxy = null;
   
-  public DAGClientRPCImpl(long clientVersion, 
-                          InetSocketAddress addr,
-                          TezConfiguration conf) throws IOException {
-    RPC.setProtocolEngine(conf, 
-                          DAGClientAMProtocolBlockingPB.class, 
-                          ProtobufRpcEngine.class);
-    proxy =
-        (DAGClientAMProtocolBlockingPB) 
-        RPC.getProxy(DAGClientAMProtocolBlockingPB.class, 
-                     clientVersion,
-                     addr, 
-                     conf);
+  public DAGClientRPCImpl(ApplicationId appId, String dagId,
+      TezConfiguration conf) {
+    this.appId = appId;
+    this.dagId = dagId;
+    this.conf = conf;
+    yarnClient = new YarnClientImpl();
+    yarnClient.init(conf);
+    yarnClient.start();
+    appState = null;
   }
   
   @Override
-  public List<String> getAllDAGs() throws IOException, TezException {
-    GetAllDAGsRequestProto requestProto = 
-        GetAllDAGsRequestProto.newBuilder().build();
-    try {
-      return proxy.getAllDAGs(null, requestProto).getDagIdList();
-    } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezRemoteException
-      throw new TezException(e);
+  public ApplicationId getApplicationId() {
+    return appId;
+  }
+  
+  @Override
+  public DAGStatus getDAGStatus() throws IOException, TezException {
+    if(createAMProxyIfNeeded()) {
+      try {
+        return getDAGStatusViaAM();
+      } catch (TezException e) {
+        resetProxy(e); // create proxy again
+      }
     }
+    
+    // Later maybe from History
+    return getDAGStatusViaRM();
   }
 
   @Override
-  public DAGStatus getDAGStatus(String dagId) 
+  public VertexStatus getVertexStatus(String vertexName)
                                     throws IOException, TezException {
-    GetDAGStatusRequestProto requestProto = 
-        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
+    if(createAMProxyIfNeeded()) {
+      try {
+        return getVertexStatusViaAM(vertexName);
+      } catch (TezException e) {
+        resetProxy(e); // create proxy again
+      }
+    }
     
+    // need AM for this. Later maybe from History
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+    if(yarnClient != null) {
+      yarnClient.stop();
+    }
+  }
+  
+  void resetProxy(Exception e) {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId + 
+          " due to exception :", e);
+    }
+    proxy = null;
+  }
+  
+  DAGStatus getDAGStatusViaAM() throws IOException, TezException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+    }
+    GetDAGStatusRequestProto requestProto = 
+        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();    
     try {
       return new DAGStatus(
                  proxy.getDAGStatus(null, requestProto).getDagStatus());
     } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezRemoteException
+      // TEZ-151 retrieve wrapped TezException
       throw new TezException(e);
     }
   }
-
-  @Override
-  public VertexStatus getVertexStatus(String dagId, String vertexName)
-                                    throws IOException, TezException {
+  
+  DAGStatus getDAGStatusViaRM() throws TezException, IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+    }
+    ApplicationReport appReport;
+    try {
+      appReport = yarnClient.getApplicationReport(appId);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
+    DAGStatus dagStatus = new DAGStatus(builder);
+    DAGStatusStateProto dagState = null;
+    switch (appReport.getYarnApplicationState()) {
+    case NEW:
+    case NEW_SAVING:
+    case SUBMITTED:
+    case ACCEPTED:
+      dagState = DAGStatusStateProto.DAG_SUBMITTED;
+      break;
+    case RUNNING:
+      dagState = DAGStatusStateProto.DAG_RUNNING;
+      break;
+    case FAILED:
+      dagState = DAGStatusStateProto.DAG_FAILED;
+      break;
+    case KILLED:
+      dagState = DAGStatusStateProto.DAG_KILLED;
+      break;      
+    case FINISHED:
+      switch(appReport.getFinalApplicationStatus()) {
+      case UNDEFINED:
+      case FAILED:
+        dagState = DAGStatusStateProto.DAG_FAILED;
+        break;
+      case KILLED:
+        dagState = DAGStatusStateProto.DAG_KILLED;
+        break;        
+      case SUCCEEDED:
+        dagState = DAGStatusStateProto.DAG_SUCCEEDED;
+        break;
+      }
+      break;
+    }
+    
+    builder.setState(dagState);
+    if(appReport.getDiagnostics() != null) {
+      builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+    }
+    
+    return dagStatus;
+  }
+  
+  VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
+          + " vertex: " + vertexName);
+    }
     GetVertexStatusRequestProto requestProto = 
         GetVertexStatusRequestProto.newBuilder().
                         setDagId(dagId).setVertexName(vertexName).build();
@@ -92,16 +198,50 @@ public class DAGClientRPCImpl implements DAGClient, Closeable {
       return new VertexStatus(
                  proxy.getVertexStatus(null, requestProto).getVertexStatus());
     } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezRemoteException
+      // TEZ-151 retrieve wrapped TezException
       throw new TezException(e);
     }
   }
 
-  @Override
-  public void close() throws IOException {
-    if (this.proxy != null) {
-      RPC.stopProxy(this.proxy);
+  ApplicationReport getAMState() throws IOException, TezException {
+    try {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App: " + appId + " in state: "
+            + appReport.getYarnApplicationState());
+      }
+      return appReport;
+    } catch (YarnException e) {
+      throw new TezException(e);
     }
   }
 
+  boolean createAMProxyIfNeeded() throws IOException, TezException {
+    if(proxy != null) {
+      // if proxy exist optimistically use it assuming there is no retry
+      return true;
+    }
+    ApplicationReport appReport = getAMState();
+    appState = appReport.getYarnApplicationState();
+    if(appState != YarnApplicationState.RUNNING) {
+      return false;
+    }
+    
+    // YARN-808. Cannot ascertain if AM is ready until we connect to it.
+    // workaround check the default string set by YARN
+    if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
+        appReport.getRpcPort() == 0){
+      // attempt not running
+      return false;
+    }
+
+    InetSocketAddress addr = new InetSocketAddress(appReport.getHost(),
+        appReport.getRpcPort());
+
+    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+        ProtobufRpcEngine.class);
+    proxy = (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+        DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 6222989..dbaf84e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -34,17 +34,18 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol;
+import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 
 import com.google.protobuf.BlockingService;
 
 public class DAGClientServer extends AbstractService {
   static final Log LOG = LogFactory.getLog(DAGClientServer.class);
       
-  DAGClient realInstance;
+  DAGClientHandler realInstance;
   Server server;
   InetSocketAddress bindAddress;
 
-  public DAGClientServer(DAGClient realInstance) {
+  public DAGClientServer(DAGClientHandler realInstance) {
     super("DAGClientRPCServer");
     this.realInstance = realInstance;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index df679f0..2b9c723 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -18,11 +18,9 @@
 
 package org.apache.tez.dag.api.client.rpc;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -34,6 +32,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequ
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
+import org.apache.tez.dag.app.DAGAppMaster.DAGClientHandler;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -41,9 +40,9 @@ import com.google.protobuf.ServiceException;
 public class DAGClientAMProtocolBlockingPBServerImpl implements
     DAGClientAMProtocolBlockingPB {
   
-  DAGClient real;
+  DAGClientHandler real;
   
-  public DAGClientAMProtocolBlockingPBServerImpl(DAGClient real) {
+  public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real) {
     this.real = real;
   }
 
@@ -55,8 +54,6 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
       return GetAllDAGsResponseProto.newBuilder().addAllDagId(dagIds).build();
     } catch(TezException e) {
       throw wrapException(e);
-    } catch(IOException e) {
-      throw wrapException(e);
     }
   }
 
@@ -73,8 +70,6 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
                                 setDagStatus(builder.getProto()).build();
     } catch (TezException e) {
       throw wrapException(e);
-    } catch(IOException e) {
-      throw wrapException(e);
     }
   }
 
@@ -91,8 +86,6 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
                             setVertexStatus(builder.getProto()).build();
     } catch (TezException e) {
       throw wrapException(e);
-    } catch(IOException e) {
-      throw wrapException(e);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4151bcc..f842f89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
-import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -710,22 +709,19 @@ public class DAGAppMaster extends CompositeService {
     LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state);
   }
 
-  class DAGClientHandler implements DAGClient {
+  public class DAGClientHandler {
 
-    @Override
     public List<String> getAllDAGs() throws TezException {
       return Collections.singletonList(dag.getID().toString());
     }
 
-    @Override
     public DAGStatus getDAGStatus(String dagIdStr)
-                                      throws IOException, TezException {
+                                      throws TezException {
       return getDAG(dagIdStr).getDAGStatus();
     }
 
-    @Override
     public VertexStatus getVertexStatus(String dagIdStr, String vertexName)
-        throws IOException, TezException{
+        throws TezException{
       VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName);
       if(status == null) {
         throw new TezException("Unknown vertexName: " + vertexName);
@@ -734,7 +730,7 @@ public class DAGAppMaster extends CompositeService {
       return status;
     }
 
-    DAG getDAG(String dagIdStr) throws IOException, TezException {
+    DAG getDAG(String dagIdStr) throws TezException {
       TezDAGID dagId = TezDAGID.fromString(dagIdStr);
       if(dagId == null) {
         throw new TezException("Bad dagId: " + dagIdStr);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 030a6ac..38cc9fe 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -162,6 +161,56 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
     
+    // min, max > 0 and min == max
+    scheduler = 
+        new BipartiteSlowStartVertexScheduler(mockManagedVertex, 1.0f, 1.0f);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    // task completion from non-bipartite stage does nothing
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
+    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
+    
+    // min, max > 0 and min == max
+    scheduler = 
+        new BipartiteSlowStartVertexScheduler(mockManagedVertex, 1.0f, 1.0f);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    // task completion from non-bipartite stage does nothing
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId31);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 0);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 1);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3);
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
+    Assert.assertTrue(scheduler.pendingTasks.isEmpty());
+    Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
+    
     // min, max > and min < max
     scheduler = 
         new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0.25f, 0.75f);
@@ -183,5 +232,25 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
     Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
 
+    // min, max > and min < max
+    scheduler = 
+        new BipartiteSlowStartVertexScheduler(mockManagedVertex, 0.25f, 1.0f);
+    scheduler.onVertexStarted();
+    Assert.assertTrue(scheduler.pendingTasks.size() == 3); // no tasks scheduled
+    Assert.assertTrue(scheduler.numSourceTasks == 4);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 2);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 2);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId21);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 1);
+    Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 3);
+    scheduler.onSourceTaskCompleted(mockSrcAttemptId22);
+    Assert.assertTrue(scheduler.pendingTasks.size() == 0);
+    Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
+    Assert.assertTrue(scheduler.numSourceTasksCompleted == 4);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
index 6a14e62..950328c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
@@ -37,17 +37,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 
@@ -178,6 +173,7 @@ public class GroupByOrderByMRRTest {
       System.exit(2);
     }
 
+    @SuppressWarnings("deprecation")
     Job job = new Job(conf, "groupbyorderbymrrtest");
 
     job.setJarByClass(GroupByOrderByMRRTest.class);
@@ -196,27 +192,21 @@ public class GroupByOrderByMRRTest {
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
-    YarnClient yarnClient = new YarnClientImpl();
-    yarnClient.init(conf);
-    yarnClient.start();
-
     TezClient tezClient = new TezClient(new TezConfiguration(conf));
 
     job.submit();
     JobID jobId = job.getJobID();
     ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
 
-    DAGClient dagClient = null;
-    ApplicationReport  appReport;
+    DAGClient dagClient = tezClient.getDAGClient(appId);
+    DAGStatus dagStatus = null;
     while (true) {
-      appReport = yarnClient.getApplicationReport(appId);
-      if(appReport.getYarnApplicationState() == YarnApplicationState.RUNNING) {
-        dagClient = tezClient.getDAGClient(appId.toString());
-        break;
-      }
-      if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
-        || appReport.getYarnApplicationState() == YarnApplicationState.FAILED
-        || appReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
+      dagStatus = dagClient.getDAGStatus();
+      if(dagStatus.getState() == DAGStatus.State.RUNNING || 
+         dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+         dagStatus.getState() == DAGStatus.State.FAILED ||
+         dagStatus.getState() == DAGStatus.State.KILLED || 
+         dagStatus.getState() == DAGStatus.State.ERROR) {
         break;
       }
       try {
@@ -227,60 +217,48 @@ public class GroupByOrderByMRRTest {
     }
 
     DecimalFormat formatter = new DecimalFormat("###.##%");
-    while (dagClient != null) {
+    while (dagStatus.getState() == DAGStatus.State.RUNNING) {
       try {
-        String dagId = dagClient.getAllDAGs().get(0);
-        DAGStatus dagStatus = dagClient.getDAGStatus(dagId);
-        System.out.println("");
-        System.out.println("DAG: " + dagId +
-            " State: " + dagStatus.getState() +
-            " Progress: " +
-            formatter.format(
-                ((double)dagStatus.getDAGProgress().getSucceededTaskCount()
-                    + (double)dagStatus.getDAGProgress().getKilledTaskCount()
-                    + (double)dagStatus.getDAGProgress().getFailedTaskCount())/
-                    (double)dagStatus.getDAGProgress().getTotalTaskCount()));
-        final String[] vNames = {"initialmap", "ivertex1", "finalreduce"};
-        for(String vertexName : vNames) {
-          Progress vProgress = dagStatus.getVertexProgress().get(vertexName);
-          System.out.println("VertexStatus:" +
-              " VertexName: " + (vertexName.equals("ivertex1") ?
-                  "intermediate-reducer" : vertexName) +
-              " Progress: " +
-                formatter.format(((double)vProgress.getSucceededTaskCount()
-                  + (double)vProgress.getKilledTaskCount()
-                  + (double)vProgress.getFailedTaskCount())/
-                  (double)vProgress.getTotalTaskCount()));
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-            // continue;
+        Progress progress = dagStatus.getDAGProgress();
+        if (progress != null) {
+          System.out.println("");
+          System.out.println("DAG: State: "
+              + dagStatus.getState()
+              + " Progress: "
+              + formatter.format((progress.getSucceededTaskCount()
+                  + progress.getKilledTaskCount() + progress
+                    .getFailedTaskCount())
+                  / (double) progress.getTotalTaskCount()));
+          final String[] vNames = { "initialmap", "ivertex1", "finalreduce" };
+          for (String vertexName : vNames) {
+            Progress vProgress = dagStatus.getVertexProgress().get(vertexName);
+            if (vProgress != null) {
+              System.out.println("VertexStatus:"
+                  + " VertexName: "
+                  + (vertexName.equals("ivertex1") ? "intermediate-reducer"
+                      : vertexName)
+                  + " Progress: "
+                  + formatter.format((vProgress.getSucceededTaskCount()
+                      + vProgress.getKilledTaskCount() + vProgress
+                        .getFailedTaskCount())
+                      / (double) vProgress.getTotalTaskCount()));
+            }
           }
         }
-      } catch (TezException e) {
-        // AM not responding
-        dagClient = null;
-        appReport = yarnClient.getApplicationReport(appId);
-        if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING){
-          LOG.info("App not running. Falling back to RM for report.");
-        } else {
-          LOG.warn("App running but failed to get report from AM.", e);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // continue;
         }
+        dagStatus = dagClient.getDAGStatus();
+      } catch (TezException e) {
+        LOG.fatal("Failed to get application progress. Exiting");
+        System.exit(-1);
       }
     }
 
-    if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
-        || appReport.getYarnApplicationState() == YarnApplicationState.FAILED
-        || appReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
-      LOG.info("Application completed. "
-          + "FinalState=" + appReport.getFinalApplicationStatus());
-      System.exit(
-          appReport.getYarnApplicationState() == YarnApplicationState.FINISHED?
-              0 : 1);
-    }
-
-    LOG.fatal("Failed to get application progress. Exiting");
-    System.exit(-1);
+    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+    System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 9052e02..3b71aac 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -317,25 +316,17 @@ public class TestMRRJobsDAGApi {
     TezClient tezClient = new TezClient(new TezConfiguration(
         mrrTezCluster.getConfig()));
     // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
-    ApplicationId appId = tezClient.submitDAGApplication(dag, remoteStagingDir,
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, remoteStagingDir,
         null, "default", Collections.singletonList(""), commonEnv, amLocalResources);
-    DAGClient dagClient = tezClient.getDAGClient(appId.toString());
 
-    while (dagClient == null) {
-      LOG.info("Waiting to get DAG Client. Sleeping for 200ms");
-      // TODO Fix the looping after TEZ-206 is fixed.
-      Thread.sleep(200l);
-      dagClient = tezClient.getDAGClient(appId.toString());
-    }
-    String dagId = dagClient.getAllDAGs().get(0);
-    DAGStatus dagStatus = dagClient.getDAGStatus(dagId);
+    DAGStatus dagStatus = dagClient.getDAGStatus();
     while (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
       LOG.info("Waiting for job to complete. Sleeping for 500ms. Current state: "
           + dagStatus);
       // TODO The test will fail if the AM sleep is removed. TEZ-207 to fix
       // this.
       Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(dagId);
+      dagStatus = dagClient.getDAGStatus();
     }
 
     // TODO Add additional checks for tracking URL etc. - once it's exposed by

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
index 0df6370..44b2734 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
@@ -18,35 +18,19 @@
 
 package org.apache.tez.mapreduce;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 
 import java.io.IOException;
 
 public class ClientServiceDelegate {
-  private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
 
   private final TezConfiguration conf;
-  private final ResourceMgrDelegate rm;
-  private DAGClient dagClient;
-  private String currentDAGId;
-  private TezClient tezClient;
-  private ApplicationReport appReport;
 
   // FIXME
   // how to handle completed jobs that the RM does not know about?
@@ -59,8 +43,6 @@ public class ClientServiceDelegate {
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
         this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
             MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
-    this.rm = rm;
-    tezClient = new TezClient(this.conf);
   }
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
@@ -89,47 +71,8 @@ public class ClientServiceDelegate {
   }
   
   public JobStatus getJobStatus(JobID oldJobID) throws IOException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
-      TypeConverter.toYarn(oldJobID);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    String jobFile = MRApps.getJobFile(conf, user, oldJobID);
-
-    try {
-      if(dagClient == null) {
-        appReport = getAppReport(oldJobID);
-        if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
-          // if job not running return status from appReport;
-          return getJobStatusFromRM(appReport, jobFile);
-        } else {
-          // job is running. create dag am client
-          dagClient = tezClient.getDAGClient(appReport.getHost(), 
-                                             appReport.getRpcPort());
-          currentDAGId = dagClient.getAllDAGs().get(0);
-        }
-      }
-      // return status from client. use saved appReport for queue etc
-      DAGStatus dagStatus = dagClient.getDAGStatus(currentDAGId);
-      return new DAGJobStatus(appReport, dagStatus, jobFile);
-    } catch (TezException e) {
-      // AM not responding
-      dagClient = null;
-      currentDAGId = null;
-      appReport = getAppReport(oldJobID);
-      if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
-        LOG.info("App not running. Falling back to RM for report.");
-      } else {
-        LOG.warn("App running but failed to get report from AM.", e);
-      }
-    }
-    
-    // final fallback
-    return getJobStatusFromRM(appReport, jobFile);
-  }
-  
-  private JobStatus getJobStatusFromRM(ApplicationReport appReport, String jobFile) {
-    JobStatus jobStatus =
-      new DAGJobStatus(appReport, null, jobFile);
-    return jobStatus;            
+    // handled in YARNRunner
+    throw new UnsupportedOperationException();
   }
 
   public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
@@ -158,15 +101,5 @@ public class ClientServiceDelegate {
       throws YarnException, IOException {
     // FIXME logs for an attempt?
     throw new UnsupportedOperationException();
-  }
-  
-  private ApplicationReport getAppReport(JobID oldJobID) throws IOException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
-      TypeConverter.toYarn(oldJobID);
-    try {
-      return rm.getApplicationReport(jobId.getAppId());
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
index 10d05b3..27f4aa4 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
@@ -30,24 +30,22 @@ import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.mortbay.log.Log;
 
 public class DAGJobStatus extends JobStatus {
 
-  String jobFile;
-  private final ApplicationReport report;
+  private final String jobFile;
   private final DAGStatus dagStatus;
+  private final ApplicationReport report;
   
-  public DAGJobStatus(ApplicationReport appReport, DAGStatus dagStatus, String jobFile) {
+  public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
     super();
-    this.report = appReport;
     this.dagStatus = dagStatus;
     this.jobFile = jobFile;
+    this.report = report;
   }
   
   @Override
@@ -137,13 +135,10 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized float getMapProgress() {
-    if(dagStatus != null) {
+    if(dagStatus.getVertexProgress() != null) {
       return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName());
     }
-    if (report.getYarnApplicationState().equals(
-        YarnApplicationState.FINISHED)
-        && report.getFinalApplicationStatus().equals(
-            FinalApplicationStatus.SUCCEEDED)) {
+    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
       return 1.0f;
     }
     return 0.0f;
@@ -151,8 +146,10 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized float getCleanupProgress() {
-    if (report.getYarnApplicationState().equals(
-        YarnApplicationState.FINISHED)) {
+    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+        dagStatus.getState() == DAGStatus.State.FAILED || 
+        dagStatus.getState() == DAGStatus.State.KILLED ||
+        dagStatus.getState() == DAGStatus.State.ERROR) {
       return 1.0f;
     }
     return 0.0f;
@@ -160,10 +157,7 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized float getSetupProgress() {
-    if (report.getYarnApplicationState().equals(
-        YarnApplicationState.RUNNING)
-        && report.getFinalApplicationStatus().equals(
-            FinalApplicationStatus.UNDEFINED)) {
+    if (dagStatus.getState() == DAGStatus.State.RUNNING) {
       return 1.0f;
     }
     return 0.0f;
@@ -174,10 +168,7 @@ public class DAGJobStatus extends JobStatus {
     if(dagStatus != null) {
       return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
     }
-    if (report.getYarnApplicationState().equals(
-        YarnApplicationState.FINISHED)
-        && report.getFinalApplicationStatus().equals(
-            FinalApplicationStatus.SUCCEEDED)) {
+    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
       return 1.0f;
     }
     return 0.0f;
@@ -185,8 +176,23 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized State getState() {
-    return TypeConverter.fromYarn(report.getYarnApplicationState(),
-        report.getFinalApplicationStatus());
+    switch (dagStatus.getState()) {
+    case SUBMITTED:
+    case INITING:
+      return State.PREP;
+    case RUNNING:
+      return State.RUNNING;
+    case SUCCEEDED:
+      return State.SUCCEEDED;
+    case KILLED:
+      return State.KILLED;
+    case FAILED:
+    case ERROR:
+      return State.FAILED;
+    default:
+      throw new TezUncheckedException("Unknown value of DAGState.State:"
+          + dagStatus.getState());
+    }
   }
 
   @Override
@@ -228,12 +234,10 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized boolean isJobComplete() {
-    return (report.getYarnApplicationState().equals(
-        YarnApplicationState.FINISHED)
-        || report.getYarnApplicationState().equals(
-            YarnApplicationState.FAILED)
-        || report.getYarnApplicationState().equals(
-            YarnApplicationState.KILLED));
+    return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+        dagStatus.getState() == DAGStatus.State.FAILED || 
+        dagStatus.getState() == DAGStatus.State.KILLED ||
+        dagStatus.getState() == DAGStatus.State.ERROR);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f6554fe8/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 1eae3bb..aaaa169 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.YarnRuntimeException;
@@ -81,7 +82,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Apps;
@@ -94,8 +94,11 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -127,6 +130,7 @@ public class YARNRunner implements ClientProtocol {
   
   private final TezConfiguration tezConf;
   private final TezClient tezClient;
+  private DAGClient dagClient;
 
   /**
    * Yarn runner incapsulates the client interface of
@@ -775,7 +779,7 @@ public class YARNRunner implements ClientProtocol {
     // Submit to ResourceManager
     try {
       Path appStagingDir = fs.resolvePath(new Path(jobSubmitDir));
-      tezClient.submitDAGApplication(
+      dagClient = tezClient.submitDAGApplication(
           appId,
           dag, 
           appStagingDir, 
@@ -785,21 +789,11 @@ public class YARNRunner implements ClientProtocol {
           environment, 
           jobLocalResources);
 
-      ApplicationReport appMasterReport = resMgrDelegate
-          .getApplicationReport(appId);
-      String diagnostics = (appMasterReport == null ? "application report is null"
-          : appMasterReport.getDiagnostics());
-      if (appMasterReport == null
-          || appMasterReport.getYarnApplicationState() == YarnApplicationState.FAILED
-          || appMasterReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
-        throw new IOException("Failed to run job : " + diagnostics);
-      }
-    } catch (YarnException e) {
+    } catch (TezException e) {
       throw new IOException(e);
     }
 
-    // FIXME
-    return clientCache.getClient(jobId).getJobStatus(jobId);
+    return getJobStatus(jobId);
   }
 
   private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
@@ -847,8 +841,21 @@ public class YARNRunner implements ClientProtocol {
   @Override
   public JobStatus getJobStatus(JobID jobID) throws IOException,
       InterruptedException {
-    JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
-    return status;
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    String jobFile = MRApps.getJobFile(conf, user, jobID);
+    DAGStatus dagStatus;
+    try {
+      dagStatus = dagClient.getDAGStatus();
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
+    try {
+      ApplicationReport report = resMgrDelegate
+          .getApplicationReport(resMgrDelegate.getApplicationId());
+      return new DAGJobStatus(report, dagStatus, jobFile);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
   }
 
   @Override


Mime
View raw message