tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-9. Support querying state of the DAG AM from a client via a client RPC (bikas)
Date Wed, 22 May 2013 01:31:52 GMT
Updated Branches:
  refs/heads/TEZ-1 37d7a4cc3 -> ef37823c9


TEZ-9. Support querying state of the DAG AM from a client via a client RPC (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ef37823c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ef37823c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ef37823c

Branch: refs/heads/TEZ-1
Commit: ef37823c90efedb5a0b14f94b4e9e785cb1c5071
Parents: 37d7a4c
Author: Bikas Saha <bikas@apache.org>
Authored: Tue May 21 18:28:41 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue May 21 18:28:41 2013 -0700

----------------------------------------------------------------------
 tez-dag-api/pom.xml                                |   15 ++
 .../main/java/org/apache/tez/client/TezClient.java |   95 +++++++++++++
 .../org/apache/tez/dag/api/TezConfiguration.java   |   11 ++
 .../java/org/apache/tez/dag/api/TezException.java  |    2 +-
 .../org/apache/tez/dag/api/TezRemoteException.java |   31 +++++
 .../org/apache/tez/dag/api/client/DAGClient.java   |   10 +-
 .../org/apache/tez/dag/api/client/Progress.java    |    9 ++
 .../tez/dag/api/client/rpc/DAGClientRPCImpl.java   |   18 ++-
 .../apache/tez/dag/api/client/DAGClientServer.java |  106 +++++++++++++++
 .../DAGClientAMProtocolBlockingPBServerImpl.java   |  103 ++++++++++++++
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |   49 ++++---
 .../apache/tez/dag/app/client/ClientService.java   |   29 ----
 .../tez/dag/app/client/impl/TezClientService.java  |   43 ------
 .../apache/tez/dag/app/client/package-info.java    |   20 ---
 .../tez/dag/app/rm/TaskSchedulerEventHandler.java  |   31 +++--
 .../tez/mapreduce/ClientServiceDelegate.java       |   70 ++++++++--
 .../org/apache/tez/mapreduce/DAGJobStatus.java     |   27 ++++-
 17 files changed, 517 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag-api/pom.xml b/tez-dag-api/pom.xml
index 74b1284..3cb0952 100644
--- a/tez-dag-api/pom.xml
+++ b/tez-dag-api/pom.xml
@@ -34,6 +34,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
   </dependencies>
@@ -46,6 +50,17 @@
         <configuration>
         </configuration>
       </plugin>
+    <plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+     <artifactId>maven-jar-plugin</artifactId>
+      <configuration>
+       <archive>
+         <manifest>
+           <mainClass>org.apache.tez.client.TezClient</mainClass>
+         </manifest>
+       </archive>
+     </configuration>
+    </plugin>
       <plugin>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-maven-plugins</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
new file mode 100644
index 0000000..f794014
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -0,0 +1,95 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezRemoteException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+
+public class TezClient {
+  private final TezConfiguration conf;
+  
+  public TezClient(TezConfiguration conf) {
+    this.conf = conf;
+  }
+
+  public DAGClient getDAGClient(String appIdStr) throws IOException, TezException {
+    try {
+      System.out.println("Fetching app: " + appIdStr);
+      ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+      YarnClient yarnClient = new YarnClientImpl();
+      yarnClient.init(conf);
+      yarnClient.start();
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      String host = appReport.getHost();
+      int port = appReport.getRpcPort();
+      return getDAGClient(host, port);
+    } catch (YarnRemoteException e) {
+      throw new TezException(e);
+    }
+  }
+  
+  public DAGClient getDAGClient(String host, int port) throws IOException {
+    System.out.println("App host port: " + host + ":" + port);
+    InetSocketAddress addr = new InetSocketAddress(host, port);
+    DAGClient dagClient;
+    dagClient = new DAGClientRPCImpl(1, addr, conf);
+    return dagClient;    
+  }
+  
+  public static void main(String[] args) {
+    try {
+      TezClient tezClient = new TezClient(
+          new TezConfiguration(new YarnConfiguration()));
+      DAGClient dagClient = tezClient.getDAGClient(args[1]);
+      String dagId = dagClient.getAllDAGs().get(0);
+      DAGStatus dagStatus = dagClient.getDAGStatus(dagId);
+      System.out.println("DAG: " + dagId + 
+                         " State: " + dagStatus.getState() +
+                         " Progress: " + dagStatus.getDAGProgress());
+      for(String vertexName : dagStatus.getVertexProgress().keySet()) {
+        System.out.println("VertexStatus from DagStatus:" +
+                           " Vertex: " + vertexName +
+                           " Progress: " + dagStatus.getVertexProgress().get(vertexName));
+        VertexStatus vertexStatus = dagClient.getVertexStatus(dagId, vertexName);
+        System.out.println("VertexStatus:" + 
+                           " Vertex: " + vertexName + 
+                           " State: " + vertexStatus.getState() + 
+                           " Progress: " + vertexStatus.getProgress());
+      }
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 597fb2f..4a407b3 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -65,6 +65,17 @@ public class TezConfiguration extends Configuration {
   public static final String DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_PREFIX
       + "node-blacklisting.ignore-threshold-node-percent";
   public static final int DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+  
+  /** Number of threads to handle job client RPC requests.*/
+  public static final String DAG_CLIENT_AM_THREAD_COUNT =
+                                    TEZ_PREFIX + "client.am.thread-count";
+  public static final int DAG_CLIENT_AM__THREAD_COUNT_DEFAULT = 1;
+  /** 
+   * Range of ports that the AM can use when binding. Leave blank
+   * if you want all possible ports.
+   */
+  public static final String DAG_CLIENT_AM_PORT_RANGE = 
+                                    TEZ_PREFIX + "client.am.port-range";
 
 
   public static final String DAG_AM_RESOURCE_MEMORY_MB = DAG_AM_PREFIX

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java
index 265b89c..3014309 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java
@@ -18,7 +18,7 @@
 
 package org.apache.tez.dag.api;
 
-/*
+/**
  * Base Tez Exception
  */
 public class TezException extends RuntimeException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java
new file mode 100644
index 0000000..8311852
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+/**
+ *  Base TezRemoteException
+ */
+public class TezRemoteException extends Exception {
+  private static final long serialVersionUID = 6337442733802964447L;
+  public TezRemoteException(Throwable cause) { super(cause); }
+  public TezRemoteException(String message) { super(message); }
+  public TezRemoteException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 477b017..0e9e55b 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -18,9 +18,10 @@
 
 package org.apache.tez.dag.api.client;
 
+import java.io.IOException;
 import java.util.List;
 
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezRemoteException;
 
 /*
  * Inteface class for monitoring the <code>DAG</code> running in a Tez DAG
@@ -31,16 +32,17 @@ public interface DAGClient {
   /**
    * Return the identifiers for all DAG's
    */
-  List<String> getAllDAGs() throws TezException;
+  List<String> getAllDAGs() throws IOException, TezRemoteException;
   
   /**
    * Get the status of a DAG
    */
-  DAGStatus getDAGStatus(String dagId) throws TezException;
+  DAGStatus getDAGStatus(String dagId) throws IOException, TezRemoteException;
   
   /**
    * Get the status of a Vertex of a DAG 
    */
   VertexStatus getVertexStatus(String dagId, 
-                               String vertexName) throws TezException;
+                               String vertexName) 
+                                   throws IOException, TezRemoteException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
index 7158e30..af9f2a4 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
@@ -47,5 +47,14 @@ public class Progress {
   public int getKilledTaskCount() {
     return proxy.getKilledTaskCount();
   }
+  
+  @Override
+  public String toString() {
+    return new String("Total: " + getTotalTaskCount() +
+                       " Succeeded: " + getSucceededTaskCount() +
+                       " Running: " + getRunningTaskCount() + 
+                       " Failed: " + getFailedTaskCount() + 
+                       " Killed: " + getKilledTaskCount());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 9238f63..42a815e 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezRemoteException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -55,18 +55,20 @@ public class DAGClientRPCImpl implements DAGClient, Closeable {
   }
   
   @Override
-  public List<String> getAllDAGs() throws TezException {
+  public List<String> getAllDAGs() throws IOException, TezRemoteException {
     GetAllDAGsRequestProto requestProto = 
         GetAllDAGsRequestProto.newBuilder().build();
     try {
       return proxy.getAllDAGs(null, requestProto).getDagIdList();
     } catch (ServiceException e) {
-      throw new TezException(e);
+      // TEZ-151 retrieve wrapped TezRemoteException
+      throw new TezRemoteException(e);
     }
   }
 
   @Override
-  public DAGStatus getDAGStatus(String dagId) throws TezException {
+  public DAGStatus getDAGStatus(String dagId) 
+                                    throws IOException, TezRemoteException {
     GetDAGStatusRequestProto requestProto = 
         GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
     
@@ -74,13 +76,14 @@ public class DAGClientRPCImpl implements DAGClient, Closeable {
       return new DAGStatus(
                  proxy.getDAGStatus(null, requestProto).getDagStatus());
     } catch (ServiceException e) {
-      throw new TezException(e);
+      // TEZ-151 retrieve wrapped TezRemoteException
+      throw new TezRemoteException(e);
     }
   }
 
   @Override
   public VertexStatus getVertexStatus(String dagId, String vertexName)
-      throws TezException {
+                                    throws IOException, TezRemoteException {
     GetVertexStatusRequestProto requestProto = 
         GetVertexStatusRequestProto.newBuilder().
                         setDagId(dagId).setVertexName(vertexName).build();
@@ -89,7 +92,8 @@ public class DAGClientRPCImpl implements DAGClient, Closeable {
       return new VertexStatus(
                  proxy.getVertexStatus(null, requestProto).getVertexStatus());
     } catch (ServiceException e) {
-      throw new TezException(e);
+      // TEZ-151 retrieve wrapped TezRemoteException
+      throw new TezRemoteException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
new file mode 100644
index 0000000..534fcaa
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -0,0 +1,106 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol;
+
+import com.google.protobuf.BlockingService;
+
+public class DAGClientServer extends AbstractService {
+  static final Log LOG = LogFactory.getLog(DAGClientServer.class);
+      
+  DAGClient realInstance;
+  Server server;
+  InetSocketAddress bindAddress;
+
+  public DAGClientServer(DAGClient realInstance) {
+    super("DAGClientRPCServer");
+    this.realInstance = realInstance;
+  }
+  
+  @Override
+  public void start() {
+    try {
+      assert getConfig() instanceof TezConfiguration;
+      TezConfiguration conf = (TezConfiguration) getConfig();
+      InetSocketAddress addr = new InetSocketAddress(0);
+      
+      DAGClientAMProtocolBlockingPBServerImpl service = 
+          new DAGClientAMProtocolBlockingPBServerImpl(realInstance);
+      
+      BlockingService blockingService = 
+                DAGClientAMProtocol.newReflectiveBlockingService(service);
+      
+      int numHandlers = conf.getInt(TezConfiguration.DAG_CLIENT_AM_THREAD_COUNT, 
+                          TezConfiguration.DAG_CLIENT_AM__THREAD_COUNT_DEFAULT);
+      
+      String portRange = conf.get(TezConfiguration.DAG_CLIENT_AM_PORT_RANGE);
+      
+      server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf, 
+                            numHandlers, blockingService, portRange);
+      server.start();
+      bindAddress = NetUtils.getConnectAddress(server);
+      LOG.info("Instantiated DAGClientRPCServer at " + bindAddress);
+      super.start();
+    } catch (Exception e) {
+      LOG.error("Failed to start DAGClientServer: ", e);
+      throw new TezException(e);
+    }
+  }
+  
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.stop();
+    }
+    super.stop();
+  }
+  
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+  
+  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration
conf, 
+      int numHandlers, 
+      BlockingService blockingService, String portRangeConfig) throws IOException {
+    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
+    RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
+        .setInstance(blockingService).setBindAddress(addr.getHostName())
+        .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
+        .setPortRangeConfig(portRangeConfig)
+        .build();
+    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
+    return server;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
new file mode 100644
index 0000000..81d7868
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -0,0 +1,103 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client.rpc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tez.dag.api.TezRemoteException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatusBuilder;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class DAGClientAMProtocolBlockingPBServerImpl implements
+    DAGClientAMProtocolBlockingPB {
+  
+  DAGClient real;
+  
+  public DAGClientAMProtocolBlockingPBServerImpl(DAGClient real) {
+    this.real = real;
+  }
+
+  @Override
+  public GetAllDAGsResponseProto getAllDAGs(RpcController controller,
+      GetAllDAGsRequestProto request) throws ServiceException {
+    try{
+      List<String> dagIds = real.getAllDAGs();
+      return GetAllDAGsResponseProto.newBuilder().addAllDagId(dagIds).build();
+    } catch(TezRemoteException e) {
+      throw wrapException(e);
+    } catch(IOException e) {
+      throw wrapException(e);
+    }
+  }
+
+  @Override
+  public GetDAGStatusResponseProto getDAGStatus(RpcController controller,
+      GetDAGStatusRequestProto request) throws ServiceException {
+    try {
+      String dagId = request.getDagId();
+      DAGStatus status;
+      status = real.getDAGStatus(dagId);
+      assert status instanceof DAGStatusBuilder;
+      DAGStatusBuilder builder = (DAGStatusBuilder) status;
+      return GetDAGStatusResponseProto.newBuilder().
+                                setDagStatus(builder.getProto()).build();
+    } catch (TezRemoteException e) {
+      throw wrapException(e);
+    } catch(IOException e) {
+      throw wrapException(e);
+    }
+  }
+
+  @Override
+  public GetVertexStatusResponseProto getVertexStatus(RpcController controller,
+      GetVertexStatusRequestProto request) throws ServiceException {
+    try {
+      String dagId = request.getDagId();
+      String vertexName = request.getVertexName();
+      VertexStatus status = real.getVertexStatus(dagId, vertexName);
+      assert status instanceof VertexStatusBuilder;
+      VertexStatusBuilder builder = (VertexStatusBuilder) status;
+      return GetVertexStatusResponseProto.newBuilder().
+                            setVertexStatus(builder.getProto()).build();
+    } catch (TezRemoteException e) {
+      throw wrapException(e);
+    } catch(IOException e) {
+      throw wrapException(e);
+    }
+  }
+  
+  ServiceException wrapException(Exception e){
+    return new ServiceException(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 804c20c..821dd71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -67,13 +67,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.app.client.ClientService;
-import org.apache.tez.dag.app.client.impl.TezClientService;
+import org.apache.tez.dag.api.TezRemoteException;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
@@ -157,7 +156,6 @@ public class DAGAppMaster extends CompositeService {
   private AppContext context;
   private TezConfiguration conf; 
   private Dispatcher dispatcher;
-  private ClientService clientService;
   // TODO Recovery
   //private Recovery recoveryServ;
   private ContainerLauncher containerLauncher;
@@ -178,7 +176,9 @@ public class DAGAppMaster extends CompositeService {
   private HistoryEventHandler historyEventHandler;
 
   private DAGAppMasterState state;
-  private DAGMonitorServer monitor;
+  
+  DAGClientServer clientRpcServer;
+  private DAGClientHandler clientHandler;
 
   private DAG dag;
   private Credentials fsTokens = new Credentials(); // Filled during init
@@ -230,13 +230,16 @@ public class DAGAppMaster extends CompositeService {
 
     dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
     
-    monitor = new DAGMonitorServer();
+    clientHandler = new DAGClientHandler();
 
     // TODO Committer.
     //    committer = createOutputCommitter(conf);
 
     dispatcher = createDispatcher();
     addIfService(dispatcher);
+    
+    clientRpcServer = new DAGClientServer(clientHandler);
+    addIfService(clientRpcServer);
 
     taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
     addIfService(taskHeartbeatHandler);
@@ -262,11 +265,6 @@ public class DAGAppMaster extends CompositeService {
     taskCleaner = createTaskCleaner(context);
     addIfService(taskCleaner);
 
-    // TODO TEZ-9
-    //service to handle requests from JobClient
-    clientService = new TezClientService();
-    addIfService(clientService);
-
     this.dagEventDispatcher = new DagEventDispatcher();
     this.vertexEventDispatcher = new VertexEventDispatcher();
 
@@ -287,7 +285,7 @@ public class DAGAppMaster extends CompositeService {
     dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
 
     taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
-        clientService);
+        clientRpcServer);
     addIfService(taskSchedulerEventHandler);
     dispatcher.register(AMSchedulerEventType.class,
         taskSchedulerEventHandler);
@@ -712,30 +710,37 @@ public class DAGAppMaster extends CompositeService {
     LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state);
   }
   
-  class DAGMonitorServer implements DAGClient {
+  class DAGClientHandler implements DAGClient {
 
     @Override
-    public List<String> getAllDAGs() {
+    public List<String> getAllDAGs() throws TezRemoteException {
       return Collections.singletonList(dag.getID().toString());
     }
 
     @Override
-    public DAGStatus getDAGStatus(String dagIdStr) throws TezException {
+    public DAGStatus getDAGStatus(String dagIdStr) 
+                                      throws IOException, TezRemoteException {
       return getDAG(dagIdStr).getDAGStatus();
     }
 
     @Override
-    public VertexStatus getVertexStatus(String dagIdStr, String vertexName) {
-      return getDAG(dagIdStr).getVertexStatus(vertexName);
+    public VertexStatus getVertexStatus(String dagIdStr, String vertexName) 
+        throws IOException, TezRemoteException{
+      VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName);
+      if(status == null) {
+        throw new TezRemoteException("Unknown vertexName: " + vertexName);
+      }
+      
+      return status;
     }
     
-    DAG getDAG(String dagIdStr) {
+    DAG getDAG(String dagIdStr) throws IOException, TezRemoteException {
       TezDAGID dagId = TezDAGID.fromString(dagIdStr);
       if(dagId == null) {
-        throw new TezException("Bad dagId: " + dagIdStr);
+        throw new TezRemoteException("Bad dagId: " + dagIdStr);
       }
       if(!dagId.equals(dag.getID())) {
-        throw new TezException("Unknown dagId: " + dagIdStr);
+        throw new TezRemoteException("Unknown dagId: " + dagIdStr);
       }
       return dag;
     }
@@ -1083,7 +1088,9 @@ public class DAGAppMaster extends CompositeService {
       // that it doesnt take too long in shutting down
 
       // Signal the task scheduler.
-      appMaster.taskSchedulerEventHandler.setSignalled(true);
+      if(appMaster.getServiceState() == STATE.STARTED) {
+        appMaster.taskSchedulerEventHandler.setSignalled(true);
+      }
       if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED,
           DAGAppMasterState.RUNNING).contains(appMaster.state)) {
         // DAG not in a final state. Must have receive a KILL signal

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
deleted file mode 100644
index 64937a9..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.client;
-
-import java.net.InetSocketAddress;
-
-// TODONOTES - RPC service for clients
-public interface ClientService {
-
-  InetSocketAddress getBindAddress();
-
-  int getHttpPort();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
deleted file mode 100644
index 8fd79f0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.client.impl;
-
-import java.net.InetSocketAddress;
-
-import org.apache.tez.dag.app.client.ClientService;
-
-public class TezClientService implements ClientService {
-
-  // TODO remove dummy client service
-  private final InetSocketAddress dummySocketAddress =
-      new InetSocketAddress(0);
-
-  @Override
-  public InetSocketAddress getBindAddress() {
-    // TODO Auto-generated method stub
-    return dummySocketAddress;
-  }
-
-  @Override
-  public int getHttpPort() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
deleted file mode 100644
index a4cd9cc..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@InterfaceAudience.Private
-package org.apache.tez.dag.app.client;
-import org.apache.hadoop.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 85ba0b2..c61cbcc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -38,11 +38,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.client.ClientService;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
@@ -69,31 +69,24 @@ public class TaskSchedulerEventHandler extends AbstractService
   protected final AppContext appContext;
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
-  private final TaskScheduler taskScheduler;
-  // TODO change this to DAGAppMaster
+  private TaskScheduler taskScheduler;
   private DAGAppMaster dagAppMaster;
   private Map<ApplicationAccessType, String> appAcls = null;
   private Thread eventHandlingThread;
   private volatile boolean stopEventHandling;
   // Has a signal (SIGTERM etc) been issued?
   protected volatile boolean isSignalled = false;
+  final DAGClientServer clientService;
 
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
   public TaskSchedulerEventHandler(AppContext appContext,
-      ClientService clientService) {
+      DAGClientServer clientService) {
     super(TaskSchedulerEventHandler.class.getName());
     this.appContext = appContext;
-    eventHandler = appContext.getEventHandler();
-    InetSocketAddress serviceAddr = clientService.getBindAddress();
-    taskScheduler = 
-        new TaskScheduler(appContext.getApplicationAttemptId(),
-                          this,
-                          serviceAddr.getHostName(),
-                          serviceAddr.getPort(),
-                          serviceAddr.getHostName() + 
-                            ":" + clientService.getHttpPort());
+    this.eventHandler = appContext.getEventHandler();
+    this.clientService = clientService;
   }
   
   public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -346,14 +339,22 @@ public class TaskSchedulerEventHandler extends AbstractService
   @Override
   public synchronized void init(Configuration conf) {
     super.init(conf);
-    taskScheduler.init(conf);
-    // todo set heartbeat value from conf here
   }
   
   @Override
   public synchronized void start() {
     // FIXME hack alert how is this supposed to support multiple DAGs?
     // Answer: this is shared across dags. need job==app-dag-master
+    // TODO set heartbeat value from conf here
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    taskScheduler = 
+        new TaskScheduler(appContext.getApplicationAttemptId(),
+                          this,
+                          serviceAddr.getHostName(),
+                          serviceAddr.getPort(),
+                          "");
+    taskScheduler.init(getConfig());
+
     dagAppMaster = appContext.getAppMaster();
     taskScheduler.start();
     this.eventHandlingThread = new Thread() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
index 9cdc845..44ccc07 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
@@ -18,34 +18,47 @@
 
 package org.apache.tez.mapreduce;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezRemoteException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
 
 import java.io.IOException;
 
 public class ClientServiceDelegate {
+  private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
 
-  private final Configuration conf;
+  private final TezConfiguration conf;
   private final ResourceMgrDelegate rm;
+  private DAGClient dagClient;
+  private String currentDAGId;
+  private TezClient tezClient;
+  private ApplicationReport appReport;
 
   // FIXME
   // how to handle completed jobs that the RM does not know about?
 
   public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
       JobID jobId) {
-    this.conf = new Configuration(conf); // Cloning for modifying.
+    this.conf = new TezConfiguration(conf); // Cloning for modifying.
     // For faster redirects from AM to HS.
     this.conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
         this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
             MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
     this.rm = rm;
+    tezClient = new TezClient(this.conf);
   }
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
@@ -74,17 +87,42 @@ public class ClientServiceDelegate {
   }
   
   public JobStatus getJobStatus(JobID oldJobID) throws IOException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
-      TypeConverter.toYarn(oldJobID);
-    ApplicationReport appReport;
     try {
-      appReport = rm.getApplicationReport(jobId.getAppId());
-    } catch (YarnRemoteException e) {
-      throw new IOException(e);
+      if(dagClient == null) {
+        appReport = getAppReport(oldJobID);
+        if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
+          // if job not running return status from appReport;
+          return getJobStatusFromRM(appReport);
+        } else {
+          // job is running. create dag am client
+          dagClient = tezClient.getDAGClient(appReport.getHost(), 
+                                             appReport.getRpcPort());
+          currentDAGId = dagClient.getAllDAGs().get(0);
+        }
+      }
+      // return status from client. use saved appReport for queue etc
+      DAGStatus dagStatus = dagClient.getDAGStatus(currentDAGId);
+      return new DAGJobStatus(appReport, dagStatus);
+    } catch (TezRemoteException e) {
+      // AM not responding
+      dagClient = null;
+      currentDAGId = null;
+      appReport = getAppReport(oldJobID);
+      if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
+        LOG.info("App not running. Falling back to RM for report.");
+      } else {
+        LOG.warn("App running but failed to get report from AM.", e);
+      }
     }
+    
+    // final fallback
+    return getJobStatusFromRM(appReport);
+  }
+  
+  private JobStatus getJobStatusFromRM(ApplicationReport appReport) {
     JobStatus jobStatus =
-        new DAGJobStatus(appReport);
-    return jobStatus;
+        new DAGJobStatus(appReport, null);
+    return jobStatus;            
   }
 
   public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
@@ -114,4 +152,14 @@ public class ClientServiceDelegate {
     // FIXME logs for an attempt?
     throw new UnsupportedOperationException();
   }
+  
+  private ApplicationReport getAppReport(JobID oldJobID) throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+      TypeConverter.toYarn(oldJobID);
+    try {
+      return rm.getApplicationReport(jobId.getAppId());
+    } catch (YarnRemoteException e) {
+      throw new IOException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
index d58cea4..08c61f8 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
@@ -32,14 +32,20 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.mortbay.log.Log;
 
 public class DAGJobStatus extends JobStatus {
 
   private final ApplicationReport report;
+  private final DAGStatus dagStatus;
   
-  public DAGJobStatus(ApplicationReport appReport) {
+  public DAGJobStatus(ApplicationReport appReport, DAGStatus dagStatus) {
     super();
     this.report = appReport;
+    this.dagStatus = dagStatus;
   }
   
   @Override
@@ -129,6 +135,9 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized float getMapProgress() {
+    if(dagStatus != null) {
+      return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName());
+    }
     if (report.getYarnApplicationState().equals(
         YarnApplicationState.FINISHED)
         && report.getFinalApplicationStatus().equals(
@@ -160,6 +169,9 @@ public class DAGJobStatus extends JobStatus {
 
   @Override
   public synchronized float getReduceProgress() {
+    if(dagStatus != null) {
+      return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
+    }
     if (report.getYarnApplicationState().equals(
         YarnApplicationState.FINISHED)
         && report.getFinalApplicationStatus().equals(
@@ -350,5 +362,18 @@ public class DAGJobStatus extends JobStatus {
     buffer.append("needed-mem" + getNeededMem());
     return buffer.toString();
   }
+  
+  private float getProgress(String vertexName) {
+    Progress progress = dagStatus.getVertexProgress().get(vertexName);
+    if(progress == null) {
+      // no such stage. return 0 like MR app currently does.
+      return 0;
+    }
+    float totalTasks = (float) progress.getTotalTaskCount();
+    if(totalTasks != 0) {
+      return progress.getSucceededTaskCount()/totalTasks;
+    }
+    return 1;
+  }
 
 }


Mime
View raw message