tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [2/2] git commit: TEZ-12. Support for counters. (hitesh)
Date Wed, 06 Nov 2013 00:35:17 GMT
TEZ-12. Support for counters. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6fddbd01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6fddbd01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6fddbd01

Branch: refs/heads/master
Commit: 6fddbd01b5050d2bd58b4edbd0464798e1b10a1f
Parents: 83a657b
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Nov 5 16:34:57 2013 -0800
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Nov 5 16:34:57 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/DagTypeConverters.java   | 100 ++++++++++++++++-
 .../apache/tez/dag/api/client/DAGClient.java    |  15 ++-
 .../apache/tez/dag/api/client/DAGStatus.java    |  27 ++++-
 .../tez/dag/api/client/StatusGetOpts.java       |  28 +++++
 .../apache/tez/dag/api/client/VertexStatus.java |  43 ++++++--
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |  57 +++++++---
 tez-api/src/main/proto/DAGApiRecords.proto      |  42 +++++--
 .../src/main/proto/DAGClientAMProtocol.proto    |   2 +
 .../tez/dag/api/client/DAGStatusBuilder.java    |  21 ++--
 .../tez/dag/api/client/ProgressBuilder.java     |  15 ++-
 .../tez/dag/api/client/VertexStatusBuilder.java |  20 +++-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |   8 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  26 +++--
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   7 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  11 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  20 ++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  80 +++++++-------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 109 ++++++++++---------
 .../tez/mapreduce/examples/ExampleDriver.java   |  41 ++++++-
 .../mapreduce/examples/FilterLinesByWord.java   |  27 ++---
 .../examples/GroupByOrderByMRRTest.java         |  11 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |   2 +-
 .../mapreduce/examples/OrderedWordCount.java    |  34 ++++--
 .../apache/tez/mapreduce/client/YARNRunner.java |   2 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   4 +-
 25 files changed, 529 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 803c943..098a201 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -20,8 +20,11 @@ package org.apache.tez.dag.api;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -32,11 +35,16 @@ import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -50,6 +58,9 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 
 import com.google.protobuf.ByteString;
@@ -166,9 +177,8 @@ public class DagTypeConverters {
 
   public static String convertToDAGPlan(URL resource) {
     // see above notes on HDFS URL handling
-    String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
-        + resource.getFile();
-    return out;
+    return resource.getScheme() + "://" + resource.getHost()
+        + ":" + resource.getPort() + resource.getFile();
   }
 
   public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan(
@@ -380,4 +390,88 @@ public class DagTypeConverters {
       plr.hasPattern() ? plr.getPattern() : null);
   }
 
+  public static TezCounters convertTezCountersFromProto(TezCountersProto proto) {
+    TezCounters counters = new TezCounters();
+    for (TezCounterGroupProto counterGroupProto : proto.getCounterGroupsList()) {
+      CounterGroup group = counters.addGroup(counterGroupProto.getName(),
+        counterGroupProto.getDisplayName());
+      for (TezCounterProto counterProto :
+        counterGroupProto.getCountersList()) {
+        TezCounter counter = group.findCounter(
+          counterProto.getName(),
+          counterProto.getDisplayName());
+        counter.setValue(counterProto.getValue());
+      }
+    }
+    return counters;
+  }
+
+  public static TezCountersProto convertTezCountersToProto(
+      TezCounters counters) {
+    TezCountersProto.Builder builder = TezCountersProto.newBuilder();
+    Iterator<CounterGroup> groupIterator = counters.iterator();
+    int groupIndex = 0;
+    while (groupIterator.hasNext()) {
+      CounterGroup counterGroup = groupIterator.next();
+      TezCounterGroupProto.Builder groupBuilder =
+        TezCounterGroupProto.newBuilder();
+      groupBuilder.setName(counterGroup.getName());
+      groupBuilder.setDisplayName(counterGroup.getDisplayName());
+      Iterator<TezCounter> counterIterator = counterGroup.iterator();
+      int counterIndex = 0;
+      while (counterIterator.hasNext()) {
+        TezCounter counter = counterIterator.next();
+        TezCounterProto tezCounterProto = TezCounterProto.newBuilder()
+          .setName(counter.getName())
+          .setDisplayName(counter.getDisplayName())
+          .setValue(counter.getValue())
+          .build();
+        groupBuilder.addCounters(counterIndex, tezCounterProto);
+        ++counterIndex;
+      }
+      builder.addCounterGroups(groupIndex, groupBuilder.build());
+      ++groupIndex;
+    }
+    return builder.build();
+  }
+
+  public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto(
+    StatusGetOpts statusGetOpts) {
+    switch (statusGetOpts) {
+      case GET_COUNTERS:
+        return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
+    }
+    throw new TezUncheckedException("Could not convert StatusGetOpts to"
+      + " proto");
+  }
+
+  public static StatusGetOpts convertStatusGetOptsFromProto(
+    DAGProtos.StatusGetOptsProto proto) {
+    switch (proto) {
+      case GET_COUNTERS:
+        return StatusGetOpts.GET_COUNTERS;
+    }
+    throw new TezUncheckedException("Could not convert to StatusGetOpts from"
+      + " proto");
+  }
+
+  public static List<DAGProtos.StatusGetOptsProto> convertStatusGetOptsToProto(
+    Set<StatusGetOpts> statusGetOpts) {
+    List<DAGProtos.StatusGetOptsProto> protos =
+      new ArrayList<DAGProtos.StatusGetOptsProto>(statusGetOpts.size());
+    for (StatusGetOpts opt : statusGetOpts) {
+      protos.add(convertStatusGetOptsToProto(opt));
+    }
+    return protos;
+  }
+
+  public static Set<StatusGetOpts> convertStatusGetOptsFromProto(
+      List<DAGProtos.StatusGetOptsProto> protoList) {
+    Set<StatusGetOpts> opts = new TreeSet<StatusGetOpts>();
+    for (DAGProtos.StatusGetOptsProto proto : protoList) {
+      opts.add(convertStatusGetOptsFromProto(proto));
+    }
+    return opts;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 9062e8e..bbb225c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api.client;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -49,19 +50,25 @@ public interface DAGClient extends Closeable {
 
   /**
    * Get the status of the specified DAG
+   * @param statusOptions Optionally, retrieve additional information based on
+   *                      specified options
    */
-  public DAGStatus getDAGStatus() throws IOException, TezException;
+  public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException;
 
   /**
    * Get the status of a Vertex of a DAG
+   * @param statusOptions Optionally, retrieve additional information based on
+   *                      specified options
    */
-  public VertexStatus getVertexStatus(String vertexName)
-      throws IOException, TezException;
+  public VertexStatus getVertexStatus(String vertexName,
+      Set<StatusGetOpts> statusOptions)
+    throws IOException, TezException;
 
   /**
    * Kill a running DAG
    *
    */
-  public void tryKillDAG() throws TezException, IOException;
+  public void tryKillDAG() throws IOException, TezException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index 8b7277f..b2a4d21 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -21,8 +21,11 @@ package org.apache.tez.dag.api.client;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -40,11 +43,13 @@ public class DAGStatus {
     KILLED,
     FAILED,
     ERROR,
-  };
+  }
 
   DAGStatusProtoOrBuilder proxy = null;
   Progress progress = null;
   Map<String, Progress> vertexProgress = null;
+  TezCounters dagCounters = null;
+  AtomicBoolean countersInitialized = new AtomicBoolean(false);
 
   public DAGStatus(DAGStatusProtoOrBuilder proxy) {
     this.proxy = proxy;
@@ -123,13 +128,27 @@ public class DAGStatus {
     return vertexProgress;
   }
 
+  public TezCounters getDAGCounters() {
+    if (countersInitialized.get()) {
+      return dagCounters;
+    }
+    if (proxy.hasDagCounters()) {
+      dagCounters = DagTypeConverters.convertTezCountersFromProto(
+        proxy.getDagCounters());
+    }
+    countersInitialized.set(true);
+    return dagCounters;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("status=" + getState()
-        + ", progress=" + getDAGProgress()
-        + ", diagnostics="
-        + StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
+      + ", progress=" + getDAGProgress()
+      + ", diagnostics="
+      + StringUtils.join(LINE_SEPARATOR, getDiagnostics())
+      + ", counters="
+      + (dagCounters == null ? "null" : dagCounters.toString()));
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java
new file mode 100644
index 0000000..922ab24
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+/**
+ * Status Get Options used when making calls like getDAGStatus and
+ * getVertexStatus in DAGClient
+ */
+public enum StatusGetOpts {
+  /** Retrieve Counters with Status */
+  GET_COUNTERS
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index ce5dbe0..5ea190f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -19,12 +19,15 @@
 package org.apache.tez.dag.api.client;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProtoOrBuilder;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 public class VertexStatus {
-  
+
   public enum State {
     INITED,
     RUNNING,
@@ -33,11 +36,13 @@ public class VertexStatus {
     FAILED,
     ERROR,
     TERMINATING,
-  };
-  
+  }
+
   VertexStatusProtoOrBuilder proxy = null;
   Progress progress = null;
-  
+  TezCounters vertexCounters = null;
+  private AtomicBoolean countersInitialized = new AtomicBoolean(false);
+
   public VertexStatus(VertexStatusProtoOrBuilder proxy) {
     this.proxy = proxy;
   }
@@ -59,9 +64,9 @@ public class VertexStatus {
     case VERTEX_TERMINATING:
       return VertexStatus.State.TERMINATING;
     default:
-      throw new TezUncheckedException("Unsupported value for VertexStatus.State : " + 
-                              proxy.getState());
-    }    
+      throw new TezUncheckedException(
+        "Unsupported value for VertexStatus.State : " + proxy.getState());
+    }
   }
 
   public List<String> getDiagnostics() {
@@ -72,7 +77,29 @@ public class VertexStatus {
     if(progress == null && proxy.hasProgress()) {
       progress = new Progress(proxy.getProgress());
     }
-    return progress;    
+    return progress;
+  }
+
+  public TezCounters getVertexCounters() {
+    if (countersInitialized.get()) {
+      return vertexCounters;
+    }
+    if (proxy.hasVertexCounters()) {
+      vertexCounters = DagTypeConverters.convertTezCountersFromProto(
+        proxy.getVertexCounters());
+    }
+    countersInitialized.set(true);
+    return vertexCounters;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("status=" + getState()
+      + ", progress=" + getProgress()
+      + ", counters="
+      + (vertexCounters == null ? "null" : vertexCounters.toString()));
+    return sb.toString();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index dae5625..06cebca 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.api.client.rpc;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
@@ -74,10 +77,11 @@ public class DAGClientRPCImpl implements DAGClient {
   }
 
   @Override
-  public DAGStatus getDAGStatus() throws IOException, TezException {
+  public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException {
     if(createAMProxyIfNeeded()) {
       try {
-        return getDAGStatusViaAM();
+        return getDAGStatusViaAM(statusOptions);
       } catch (TezException e) {
         resetProxy(e); // create proxy again
       }
@@ -88,11 +92,12 @@ public class DAGClientRPCImpl implements DAGClient {
   }
 
   @Override
-  public VertexStatus getVertexStatus(String vertexName)
-                                    throws IOException, TezException {
+  public VertexStatus getVertexStatus(String vertexName,
+      Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException {
     if(createAMProxyIfNeeded()) {
       try {
-        return getVertexStatusViaAM(vertexName);
+        return getVertexStatusViaAM(vertexName, statusOptions);
       } catch (TezException e) {
         resetProxy(e); // create proxy again
       }
@@ -102,6 +107,8 @@ public class DAGClientRPCImpl implements DAGClient {
     return null;
   }
 
+
+
   @Override
   public void tryKillDAG() throws TezException, IOException {
     if(LOG.isDebugEnabled()) {
@@ -141,23 +148,30 @@ public class DAGClientRPCImpl implements DAGClient {
     proxy = null;
   }
 
-  DAGStatus getDAGStatusViaAM() throws IOException, TezException {
+  DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
     }
-    GetDAGStatusRequestProto requestProto =
-        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
+    GetDAGStatusRequestProto.Builder requestProtoBuilder =
+        GetDAGStatusRequestProto.newBuilder()
+          .setDagId(dagId);
+
+    if (statusOptions != null) {
+      requestProtoBuilder.addAllStatusOptions(
+        DagTypeConverters.convertStatusGetOptsToProto(statusOptions));
+    }
+
     try {
       return new DAGStatus(
-                 proxy.getDAGStatus(null, requestProto).getDagStatus());
+        proxy.getDAGStatus(null,
+          requestProtoBuilder.build()).getDagStatus());
     } catch (ServiceException e) {
       // TEZ-151 retrieve wrapped TezException
       throw new TezException(e);
     }
   }
 
-
-
   DAGStatus getDAGStatusViaRM() throws TezException, IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
@@ -175,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient {
 
     DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
     DAGStatus dagStatus = new DAGStatus(builder);
-    DAGStatusStateProto dagState = null;
+    DAGStatusStateProto dagState;
     switch (appReport.getYarnApplicationState()) {
     case NEW:
     case NEW_SAVING:
@@ -224,18 +238,27 @@ public class DAGClientRPCImpl implements DAGClient {
     return dagStatus;
   }
 
-  VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
+  VertexStatus getVertexStatusViaAM(String vertexName,
+      Set<StatusGetOpts> statusOptions)
+      throws TezException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
           + " vertex: " + vertexName);
     }
-    GetVertexStatusRequestProto requestProto =
-        GetVertexStatusRequestProto.newBuilder().
-                        setDagId(dagId).setVertexName(vertexName).build();
+    GetVertexStatusRequestProto.Builder requestProtoBuilder =
+        GetVertexStatusRequestProto.newBuilder()
+          .setDagId(dagId)
+          .setVertexName(vertexName);
+
+    if (statusOptions != null) {
+      requestProtoBuilder.addAllStatusOptions(
+        DagTypeConverters.convertStatusGetOptsToProto(statusOptions));
+    }
 
     try {
       return new VertexStatus(
-                 proxy.getVertexStatus(null, requestProto).getVertexStatus());
+        proxy.getVertexStatus(null,
+          requestProtoBuilder.build()).getVertexStatus());
     } catch (ServiceException e) {
       // TEZ-151 retrieve wrapped TezException
       throw new TezException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index b948e60..9ce51a1 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -22,12 +22,12 @@ option java_generate_equals_and_hash = true;
 
 // DAG plan messages
 
-// Many of these types have a dual in the Tez-api.  To reduce confusion, these types have prefix or suffix 
+// Many of these types have a dual in the Tez-api.  To reduce confusion, these types have prefix or suffix
 // of "Plan" to indicate they are to be used in the dag-plan.
-// The big types use a suffix:  JobPlan, VertexPlan, EdgePlan 
+// The big types use a suffix:  JobPlan, VertexPlan, EdgePlan
 //   --> these get more direct use in the runtime and the naming is natural.
 // The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
-//   --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable. 
+//   --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable.
 
 enum PlanVertexType {
   INPUT = 0;
@@ -94,7 +94,7 @@ message PlanTaskConfiguration {
   required string javaOpts = 4;
   required string taskModule = 5;
   repeated PlanLocalResource localResource = 6;
-  repeated PlanKeyValuePair environmentSetting = 8;  
+  repeated PlanKeyValuePair environmentSetting = 7;
 }
 
 message TezEntityDescriptorProto {
@@ -113,11 +113,11 @@ message VertexPlan {
   required PlanVertexType type = 2;
   optional TezEntityDescriptorProto processor_descriptor = 3;
   required PlanTaskConfiguration taskConfig = 4;
-  repeated PlanTaskLocationHint taskLocationHint = 7;
-  repeated string inEdgeId = 8;
-  repeated string outEdgeId = 9;
-  repeated RootInputLeafOutputProto inputs = 10;
-  repeated RootInputLeafOutputProto outputs = 11;
+  repeated PlanTaskLocationHint taskLocationHint = 5;
+  repeated string inEdgeId = 6;
+  repeated string outEdgeId = 7;
+  repeated RootInputLeafOutputProto inputs = 8;
+  repeated RootInputLeafOutputProto outputs = 9;
 }
 
 message EdgePlan {
@@ -165,6 +165,7 @@ message VertexStatusProto {
   optional VertexStatusStateProto state = 1;
   repeated string diagnostics = 2;
   optional ProgressProto progress = 3;
+  optional TezCountersProto vertexCounters = 4;
 }
 
 enum DAGStatusStateProto {
@@ -187,9 +188,30 @@ message DAGStatusProto {
   optional DAGStatusStateProto state = 1;
   repeated string diagnostics = 2;
   optional ProgressProto DAGProgress = 3;
-  repeated StringProgressPairProto vertexProgress = 4;  
+  repeated StringProgressPairProto vertexProgress = 4;
+  optional TezCountersProto dagCounters = 5;
 }
 
 message PlanLocalResourcesProto {
   repeated PlanLocalResource localResources = 1;
 }
+
+message TezCounterProto {
+  optional string name = 1;
+  optional string display_name = 2;
+  optional int64 value = 3;
+}
+
+message TezCounterGroupProto {
+  optional string name = 1;
+  optional string display_name = 2;
+  repeated TezCounterProto counters = 3;
+}
+
+message TezCountersProto {
+  repeated TezCounterGroupProto counter_groups = 1;
+}
+
+enum StatusGetOptsProto {
+  GET_COUNTERS = 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 1236190..0f29364 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -34,6 +34,7 @@ message GetAllDAGsResponseProto {
 
 message GetDAGStatusRequestProto {
   optional string dagId = 1;
+  repeated StatusGetOptsProto statusOptions = 3;
 }
 
 message GetDAGStatusResponseProto {
@@ -43,6 +44,7 @@ message GetDAGStatusResponseProto {
 message GetVertexStatusRequestProto {
   optional string dagId = 1;
   optional string vertexName = 2;
+  repeated StatusGetOptsProto statusOptions = 3;
 }
 
 message GetVertexStatusResponseProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
index 2b0f543..62b1399 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.api.client;
 
 import java.util.List;
 
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
 import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -33,32 +35,37 @@ public class DAGStatusBuilder extends DAGStatus {
   public DAGStatusBuilder() {
     super(DAGStatusProto.newBuilder());
   }
-  
+
   public void setState(DAGState state) {
     getBuilder().setState(getProtoState(state));
   }
-  
+
   public void setDiagnostics(List<String> diagnostics) {
     Builder builder = getBuilder();
     builder.clearDiagnostics();
     builder.addAllDiagnostics(diagnostics);
   }
-  
+
   public void setDAGProgress(ProgressBuilder progress) {
     getBuilder().setDAGProgress(progress.getProto());
   }
-  
+
+  public void setDAGCounters(TezCounters counters) {
+    getBuilder().setDagCounters(
+        DagTypeConverters.convertTezCountersToProto(counters));
+  }
+
   public void addVertexProgress(String name, ProgressBuilder progress) {
     StringProgressPairProto.Builder builder = StringProgressPairProto.newBuilder();
     builder.setKey(name);
     builder.setProgress(progress.getProto());
     getBuilder().addVertexProgress(builder.build());
   }
-  
+
   public DAGStatusProto getProto() {
     return getBuilder().build();
   }
-  
+
   private DAGStatusStateProto getProtoState(DAGState state) {
     switch(state) {
     case NEW:
@@ -80,7 +87,7 @@ public class DAGStatusBuilder extends DAGStatus {
       throw new TezUncheckedException("Unsupported value for DAGState : " + state);
     }
   }
-  
+
   private DAGStatusProto.Builder getBuilder() {
     return (Builder) this.proxy;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
index 6cedb3f..99fcfa0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java
@@ -20,38 +20,37 @@ package org.apache.tez.dag.api.client;
 
 import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
 import org.apache.tez.dag.api.records.DAGProtos.ProgressProto.Builder;
-import org.apache.tez.dag.api.client.Progress;
 
 public class ProgressBuilder extends Progress {
 
   public ProgressBuilder() {
     super(ProgressProto.newBuilder());
   }
-  
+
   public ProgressProto getProto() {
     return getBuilder().build();
   }
-  
+
   public void setTotalTaskCount(int count) {
     getBuilder().setTotalTaskCount(count);
   }
-  
+
   public void setSucceededTaskCount(int count) {
     getBuilder().setSucceededTaskCount(count);
   }
-  
+
   public void setRunningTaskCount(int count) {
     getBuilder().setRunningTaskCount(count);
   }
-  
+
   public void setFailedTaskCount(int count) {
     getBuilder().setFailedTaskCount(count);
   }
-  
+
   public void setKilledTaskCount(int count) {
     getBuilder().setKilledTaskCount(count);
   }
-  
+
   private ProgressProto.Builder getBuilder() {
     return (Builder) this.proxy;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 66de71f..47bbb2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.api.client;
 
 import java.util.List;
 
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto.Builder;
 import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
@@ -32,28 +34,34 @@ public class VertexStatusBuilder extends VertexStatus {
   public VertexStatusBuilder() {
     super(VertexStatusProto.newBuilder());
   }
-  
+
   public void setState(VertexState state) {
     getBuilder().setState(getProtoState(state));
   }
-  
+
   public void setDiagnostics(List<String> diagnostics) {
     Builder builder = getBuilder();
     builder.clearDiagnostics();
     builder.addAllDiagnostics(diagnostics);
   }
-  
+
   public void setProgress(ProgressBuilder progress) {
     getBuilder().setProgress(progress.getProto());
   }
-  
+
+  public void setVertexCounters(TezCounters counters) {
+    getBuilder().setVertexCounters(
+      DagTypeConverters.convertTezCountersToProto(counters));
+  }
+
   public VertexStatusProto getProto() {
     return getBuilder().build();
   }
-  
+
   private VertexStatusStateProto getProtoState(VertexState state) {
     switch(state) {
     case NEW:
+    case INITIALIZING:
     case INITED:
       return VertexStatusStateProto.VERTEX_INITED;
     case RUNNING:
@@ -72,7 +80,7 @@ public class VertexStatusBuilder extends VertexStatus {
       throw new TezUncheckedException("Unsupported value for VertexState : " + state);
     }
   }
-  
+
   private VertexStatusProto.Builder getBuilder() {
     return (Builder) this.proxy;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index eb1ff48..959fbbc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -74,7 +74,9 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
     try {
       String dagId = request.getDagId();
       DAGStatus status;
-      status = real.getDAGStatus(dagId);
+      status = real.getDAGStatus(dagId,
+        DagTypeConverters.convertStatusGetOptsFromProto(
+          request.getStatusOptionsList()));
       assert status instanceof DAGStatusBuilder;
       DAGStatusBuilder builder = (DAGStatusBuilder) status;
       return GetDAGStatusResponseProto.newBuilder().
@@ -90,7 +92,9 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
     try {
       String dagId = request.getDagId();
       String vertexName = request.getVertexName();
-      VertexStatus status = real.getVertexStatus(dagId, vertexName);
+      VertexStatus status = real.getVertexStatus(dagId, vertexName,
+        DagTypeConverters.convertStatusGetOptsFromProto(
+          request.getStatusOptionsList()));
       assert status instanceof VertexStatusBuilder;
       VertexStatusBuilder builder = (VertexStatusBuilder) status;
       return GetVertexStatusResponseProto.newBuilder().

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6d2ec19..10d05ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -85,6 +86,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -146,7 +148,7 @@ import org.apache.tez.runtime.library.common.security.TokenCache;
 
 @SuppressWarnings("rawtypes")
 public class DAGAppMaster extends AbstractService {
- 
+
   private static final Log LOG = LogFactory.getLog(DAGAppMaster.class);
 
   /**
@@ -474,7 +476,7 @@ public class DAGAppMaster extends AbstractService {
   protected DAG createDAG(DAGPlan dagPB) {
     TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
         dagCounter.incrementAndGet());
-    
+
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
     String dagIdString = dagId.toString();
@@ -500,7 +502,7 @@ public class DAGAppMaster extends AbstractService {
 
     return newDag;
   } // end createDag()
-  
+
   protected void addIfService(Object object, boolean addDispatcher) {
     if (object instanceof Service) {
       Service service = (Service) object;
@@ -712,13 +714,17 @@ public class DAGAppMaster extends AbstractService {
       return Collections.singletonList(currentDAG.getID().toString());
     }
 
-    public DAGStatus getDAGStatus(String dagIdStr) throws TezException {
-      return getDAG(dagIdStr).getDAGStatus();
+    public DAGStatus getDAGStatus(String dagIdStr,
+                                  Set<StatusGetOpts> statusOptions)
+        throws TezException {
+      return getDAG(dagIdStr).getDAGStatus(statusOptions);
     }
 
-    public VertexStatus getVertexStatus(String dagIdStr, String vertexName)
+    public VertexStatus getVertexStatus(String dagIdStr, String vertexName,
+        Set<StatusGetOpts> statusOptions)
         throws TezException{
-      VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName);
+      VertexStatus status = getDAG(dagIdStr)
+          .getVertexStatus(vertexName, statusOptions);
       if(status == null) {
         throw new TezException("Unknown vertexName: " + vertexName);
       }
@@ -1414,11 +1420,11 @@ public class DAGAppMaster extends AbstractService {
     appMaster.currentUser = UserGroupInformation.getCurrentUser();
         Credentials credentials =
         UserGroupInformation.getCurrentUser().getCredentials();
-    
+
     UserGroupInformation appMasterUgi = UserGroupInformation
         .createRemoteUser(jobUserName);
     appMasterUgi.addCredentials(credentials);
-    
+
     // Now remove the AM->RM token so tasks don't have it
     Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
     while (iter.hasNext()) {
@@ -1427,7 +1433,7 @@ public class DAGAppMaster extends AbstractService {
         iter.remove();
       }
     }
-    
+
     appMaster.tokens = credentials;
 
     appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index ce1ee89..4e12603 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -20,11 +20,13 @@ package org.apache.tez.dag.app.dag;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
@@ -66,8 +68,9 @@ public interface DAG {
   Configuration getConf();
 
   DAGPlan getJobPlan();
-  DAGStatusBuilder getDAGStatus();
-  VertexStatusBuilder getVertexStatus(String vertexName);
+  DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions);
+  VertexStatusBuilder getVertexStatus(String vertexName,
+                                      Set<StatusGetOpts> statusOptions);
 
   boolean isComplete();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index caab317..737091a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.counters.TezCounters;
@@ -28,6 +29,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -68,7 +70,8 @@ public interface Vertex extends Comparable<Vertex> {
   int getRunningTasks();
   float getProgress();
   ProgressBuilder getVertexProgress();
-  VertexStatusBuilder getVertexStatus();
+  VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions);
+
 
   void setParallelism(int parallelism, Map<Vertex, EdgeManager> sourceEdgeManagers);
   void setVertexLocationHint(VertexLocationHint vertexLocationHint);
@@ -79,13 +82,13 @@ public interface Vertex extends Comparable<Vertex> {
 
   Map<Vertex, Edge> getInputVertices();
   Map<Vertex, Edge> getOutputVertices();
-  
+
   void setAdditionalInputs(List<RootInputLeafOutputProto> inputs);
   void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs);
-  
+
   Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs();
   Map<String, RootInputLeafOutputDescriptor<OutputDescriptor>> getAdditionalOutputs();
-  
+
   List<InputSpec> getInputSpecList(int taskIndex);
   List<OutputSpec> getOutputSpecList(int taskIndex);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0df7875..d16086b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
@@ -510,7 +511,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   // monitoring apis
   @Override
-  public DAGStatusBuilder getDAGStatus() {
+  public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) {
     DAGStatusBuilder status = new DAGStatusBuilder();
     int totalTaskCount = 0;
     int totalSucceededTaskCount = 0;
@@ -537,6 +538,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       status.setState(getState());
       status.setDiagnostics(diagnostics);
       status.setDAGProgress(dagProgress);
+      if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+        status.setDAGCounters(getAllCounters());
+      }
       return status;
     } finally {
       readLock.unlock();
@@ -544,12 +548,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
-  public VertexStatusBuilder getVertexStatus(String vertexName) {
+  public VertexStatusBuilder getVertexStatus(String vertexName,
+      Set<StatusGetOpts> statusOptions) {
     Vertex vertex = vertexMap.get(vertexName);
     if(vertex == null) {
       return null;
     }
-    return vertex.getVertexStatus();
+    return vertex.getVertexStatus(statusOptions);
   }
 
 
@@ -1032,10 +1037,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   /**
    * Set the terminationCause and send a kill-message to all vertices.
    * The vertex-kill messages are only sent once.
-   * @param the trigger that is causing the DAG to transition to KILLED/FAILED
-   * @param event The type of kill event to send to the vertices.
    */
-  void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) {
+  void enactKill(DAGTerminationCause dagTerminationCause,
+      VertexTerminationCause vertexTerminationCause) {
 
     if(trySetTerminationCause(dagTerminationCause)){
       for (Vertex v : vertices.values()) {
@@ -1100,7 +1104,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       job.numCompletedVertices++;
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
         if (!job.reRunningVertices.contains(vertex.getVertexId())) {
-          // vertex succeeded for the first time 
+          // vertex succeeded for the first time
           job.dagScheduler.vertexCompleted(vertex);
         }
         job.vertexSucceeded(vertex);
@@ -1117,7 +1121,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         job.vertexKilled(vertex);
         forceTransitionToKillWait = true;
       }
-      
+
       job.reRunningVertices.remove(vertex.getVertexId());
 
       LOG.info("Vertex " + vertex.getVertexId() + " completed."

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 3d4703b..8a587c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -109,11 +109,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private long scheduledTime;
 
   protected TaskLocationHint locationHint;
-  
+
   private List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
-  private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS = 
+  private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
       new ArrayList(0);
-  
+
 
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
@@ -162,8 +162,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
         TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
 
-    // When current attempt fails/killed and new attempt launched then 
-    // TODO Task should go back to SCHEDULED state TEZ-495 
+    // When current attempt fails/killed and new attempt launched then
+    // TODO Task should go back to SCHEDULED state TEZ-495
     // Transitions from RUNNING state
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
@@ -444,7 +444,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       readLock.unlock();
     }
   }
-  
+
   @Override
   public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
       int fromEventId, int maxEvents) {
@@ -464,39 +464,39 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         events = Collections.unmodifiableList(new ArrayList<TezEvent>(
             tezEventsForTaskAttempts.subList(fromEventId, toEventId)));
         LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
-            + "-" + toEventId + ")"); 
-        // currently not modifying the events so that we dont have to create 
+            + "-" + toEventId + ")");
+        // currently not modifying the events so that we dont have to create
         // copies of events. e.g. if we have to set taskAttemptId into the TezEvent
         // destination metadata then we will need to create a copy of the TezEvent
-        // and then modify the metadata and then send the copy on the RPC. This 
-        // is important because TezEvents are only routed in the AM and not copied 
-        // during routing. So e.g. a broadcast edge will send the same event to 
-        // all consumers (like it should). If copies were created then re-routing 
-        // the events on parallelism changes would be difficult. We would have to 
-        // buffer the events in the Vertex until the parallelism was set and then 
+        // and then modify the metadata and then send the copy on the RPC. This
+        // is important because TezEvents are only routed in the AM and not copied
+        // during routing. So e.g. a broadcast edge will send the same event to
+        // all consumers (like it should). If copies were created then re-routing
+        // the events on parallelism changes would be difficult. We would have to
+        // buffer the events in the Vertex until the parallelism was set and then
         // route the events.
       }
       return events;
     } finally {
       readLock.unlock();
-    }    
+    }
   }
-  
-  @Override 
+
+  @Override
   public List<TezEvent> getAndClearTaskTezEvents() {
     readLock.lock();
     try {
       List<TezEvent> events = tezEventsForTaskAttempts;
-      tezEventsForTaskAttempts = new ArrayList<TezEvent>(); 
+      tezEventsForTaskAttempts = new ArrayList<TezEvent>();
       return events;
     } finally {
       readLock.unlock();
-    }        
+    }
   }
 
   @Override
   public List<String> getDiagnostics() {
-    List<String> diagnostics = new ArrayList<String>(5);
+    List<String> diagnostics = new ArrayList<String>(attempts.size());
     readLock.lock();
     try {
       for (TaskAttempt att : attempts.values()) {
@@ -616,7 +616,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         if (ta == null) {
           throw new TezUncheckedException("Unknown task for commit: " + taskAttemptID);
         }
-        // Its ok to get a non-locked state snapshot since we handle changes of 
+        // Its ok to get a non-locked state snapshot since we handle changes of
         // state in the task attempt. Dont want to deadlock here.
         TaskAttemptState taState = ta.getStateNoLock();
         if (taState == TaskAttemptState.RUNNING) {
@@ -624,7 +624,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           LOG.info(taskAttemptID + " given a go for committing the task output.");
           return true;
         } else {
-          LOG.info(taskAttemptID + " with state: " + taState + 
+          LOG.info(taskAttemptID + " with state: " + taState +
               " given a no-go for commit because its not running.");
           return false;
         }
@@ -636,14 +636,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // Don't think this can be a pluggable decision, so simply raise an
         // event for the TaskAttempt to delete its output.
         // Wait for commit attempt to succeed. Dont kill this. If commit
-        // attempt fails then choose a different committer. When commit attempt 
+        // attempt fails then choose a different committer. When commit attempt
         // succeeds then this and others will be killed
         LOG.info(commitAttempt
             + " is current committer. Commit waiting for:  "
             + taskAttemptID);
         return false;
       }
-    
+
     } finally {
       writeLock.unlock();
     }
@@ -654,7 +654,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   public boolean needsWaitAfterOutputConsumable() {
     Vertex vertex = getVertex();
     ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
-    if (processorDescriptor != null && 
+    if (processorDescriptor != null &&
         processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) {
       return true;
     } else {
@@ -929,17 +929,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
       TezTaskAttemptID successTaId = ((TaskEventTAUpdate) event).getTaskAttemptID();
-      
-      if (task.commitAttempt != null && 
+
+      if (task.commitAttempt != null &&
           !task.commitAttempt.equals(successTaId)) {
         // The succeeded attempt is not the one that was selected to commit
         // This is impossible and has to be a bug
-        throw new TezUncheckedException("TA: " + successTaId 
-            + " succeeded but TA: " + task.commitAttempt 
+        throw new TezUncheckedException("TA: " + successTaId
+            + " succeeded but TA: " + task.commitAttempt
             + " was expected to commit and succeed");
       }
-      
-      task.handleTaskAttemptCompletion(successTaId, 
+
+      task.handleTaskAttemptCompletion(successTaId,
           TaskAttemptStateInternal.SUCCEEDED);
       task.finishedAttempts++;
       --task.numberUncompletedAttempts;
@@ -976,7 +976,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
-      if (task.commitAttempt !=null && 
+      if (task.commitAttempt !=null &&
           castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
         task.commitAttempt = null;
       }
@@ -1028,7 +1028,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
-      if (task.commitAttempt != null && 
+      if (task.commitAttempt != null &&
           castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
         task.commitAttempt = null;
       }
@@ -1096,17 +1096,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       //  fails, we have to let AttemptFailedTransition.transition
       //  believe that there's no redundancy.
       unSucceed(task);
-      
+
       // fake values for code for super.transition
       ++task.numberUncompletedAttempts;
       task.finishedAttempts--;
-      TaskStateInternal returnState = super.transition(task, event);      
-      
+      TaskStateInternal returnState = super.transition(task, event);
+
       if (returnState == TaskStateInternal.SCHEDULED) {
         // tell the dag about the rescheduling
-        task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));        
+        task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
       }
-      
+
       return returnState;
     }
 
@@ -1184,8 +1184,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           logMsg));
     }
   }
-  
-  private static class AddTezEventTransition 
+
+  private static class AddTezEventTransition
       implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index df99bb7..257984f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.committer.NullVertexOutputCommitter;
@@ -240,7 +241,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITIALIZING, VertexState.ERROR,
               VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-              
+
           // Transitions from INITED state
               // SOURCE_VERTEX_STARTED - for srces which detemrine parallelism, they must complete before this vertex can start.
           .addTransition(VertexState.INITED, VertexState.INITED,
@@ -318,8 +319,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.SUCCEEDED,
               VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          .addTransition(VertexState.SUCCEEDED, 
-              EnumSet.of(VertexState.RUNNING, VertexState.FAILED), 
+          .addTransition(VertexState.SUCCEEDED,
+              EnumSet.of(VertexState.RUNNING, VertexState.FAILED),
               VertexEventType.V_TASK_RESCHEDULED,
               new TaskRescheduledAfterVertexSuccessTransition())
 
@@ -329,7 +330,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   // after we are done reruns of source tasks should not affect
                   // us. These reruns may be triggered by other consumer vertices.
-                  // We should have been in RUNNING state if we had triggered the 
+                  // We should have been in RUNNING state if we had triggered the
                   // reruns.
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   // accumulate these in case we get restarted
@@ -432,9 +433,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
 
   private RootInputInitializerRunner rootInputInitializer;
-  
+
   private VertexScheduler vertexScheduler;
-  
+
   private boolean parallelismSet = false;
 
   private VertexOutputCommitter committer;
@@ -558,10 +559,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     try {
       // does it matter to create a duplicate list for efficiency
       // instead of traversing the map
-      // local assign to LinkedHashMap to ensure that sequential traversal 
+      // local assign to LinkedHashMap to ensure that sequential traversal
       // assumption is satisfied
       LinkedHashMap<TezTaskID, Task> taskList = tasks;
-      int i=0; 
+      int i=0;
       for(Map.Entry<TezTaskID, Task> entry : taskList.entrySet()) {
         if(taskIndex == i) {
           return entry.getValue();
@@ -681,13 +682,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public VertexStatusBuilder getVertexStatus() {
+  public VertexStatusBuilder getVertexStatus(
+      Set<StatusGetOpts> statusOptions) {
     this.readLock.lock();
     try {
       VertexStatusBuilder status = new VertexStatusBuilder();
       status.setState(getInternalState());
       status.setDiagnostics(diagnostics);
       status.setProgress(getVertexProgress());
+      if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+        status.setVertexCounters(getAllCounters());
+      }
       return status;
     } finally {
       this.readLock.unlock();
@@ -752,7 +757,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  // TODO Create InputReadyVertexManager that schedules when there is something 
+  // TODO Create InputReadyVertexManager that schedules when there is something
   // to read and use that as default instead of ImmediateStart.TEZ-480
   @Override
   public void scheduleTasks(Collection<TezTaskID> taskIDs) {
@@ -775,7 +780,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Preconditions.checkState(parallelismSet == false,
           "Parallelism can only be set dynamically once per vertex");
       parallelismSet = true;
-      
+
       // Input initializer expected to set parallelism.
       if (numTasks == -1) {
         Preconditions
@@ -789,7 +794,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // INITIALIZING state.
         return;
       }
-      
+
       if (parallelism >= numTasks) {
         // not that hard to support perhaps. but checking right now since there
         // is no use case for it and checking may catch other bugs.
@@ -800,16 +805,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Ingoring setParallelism to current value: " + parallelism);
         return;
       }
-      
+
       // start buffering incoming events so that we can re-route existing events
       for (Edge edge : sourceVertices.values()) {
         edge.startEventBuffering();
       }
-      
+
       // Use a set since the same event may have been sent to multiple tasks
       // and we want to avoid duplicates
       Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
-      
+
       LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
       // assign to local variable of LinkedHashMap to make sure that changing
       // type of task causes compile error. We depend on LinkedHashMap for order
@@ -842,27 +847,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           Vertex sourceVertex = entry.getKey();
           EdgeManager edgeManager = entry.getValue();
           Edge edge = sourceVertices.get(sourceVertex);
-          LOG.info("Replacing edge manager for source:" 
+          LOG.info("Replacing edge manager for source:"
               + sourceVertex.getVertexId() + " destination: " + getVertexId());
           edge.setEdgeManager(edgeManager);
         }
       }
-      
+
       // Re-route all existing TezEvents according to new routing table
-      // At this point only events attributed to source task attempts can be 
-      // re-routed. e.g. DataMovement or InputFailed events.  
+      // At this point only events attributed to source task attempts can be
+      // re-routed. e.g. DataMovement or InputFailed events.
       // This assumption is fine for now since these tasks haven't been started.
-      // So they can only get events generated from source task attempts that 
+      // So they can only get events generated from source task attempts that
       // have already been started.
       DAG dag = getDAG();
       for(TezEvent event : pendingEvents) {
         TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
-            .getTaskID().getVertexID(); 
+            .getTaskID().getVertexID();
         Vertex sourceVertex = dag.getVertex(sourceVertexId);
         Edge sourceEdge = sourceVertices.get(sourceVertex);
         sourceEdge.sendTezEventToDestinationTasks(event);
       }
-      
+
       // stop buffering events
       for (Edge edge : sourceVertices.values()) {
         edge.stopEventBuffering();
@@ -871,7 +876,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     } finally {
       writeLock.unlock();
     }
-    
+
   }
 
   public void setVertexLocationHint(VertexLocationHint vertexLocationHint) {
@@ -1104,7 +1109,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     return finalState;
   }
-  
+
   VertexState finished(VertexState finalState) {
     return finished(finalState, null);
   }
@@ -1114,7 +1119,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // Answer: Do commit for every vertex
     // for now, only for leaf vertices
     // TODO TEZ-41 make commmitter type configurable per vertex
-    
+
     if (!this.additionalOutputSpecs.isEmpty()) {
       committer = new MRVertexOutputCommitter();
     }
@@ -1138,7 +1143,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     return VertexState.INITED;
   }
-  
+
   /**
    * If the number of tasks are greater than the configured value
    * throw an exception that will fail job initialization
@@ -1146,7 +1151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private void checkTaskLimits() {
     // no code, for now
   }
-  
+
   private void createTasks() {
     Configuration conf = this.conf;
     boolean useNullLocationHint = true;
@@ -1181,7 +1186,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
 
   }
-  
+
   public static class InitTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -1259,7 +1264,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
 
       vertex.checkTaskLimits();
-      
+
       // Create tasks based on initial configuration, but don't start them yet.
       if (vertex.numTasks == -1) {
         LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers"
@@ -1277,11 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         float waves = vertex.conf.getFloat(
             TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
             TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
-        
+
         int numTasks = (int)((totalResource*waves)/taskResource);
-        
-        LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks 
-            + " tasks. Headroom: " + totalResource + " Task Resource: " 
+
+        LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
+            + " tasks. Headroom: " + totalResource + " Task Resource: "
             + taskResource + " waves: " + waves);
 
         vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
@@ -1301,7 +1306,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return vertex.initializeVertex();
     }
   } // end of InitTransition
-  
+
   @VisibleForTesting
   protected RootInputInitializerRunner createRootInputInitializerRunner(
       String dagName, String vertexName, TezVertexID vertexID,
@@ -1309,7 +1314,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return new RootInputInitializerRunner(dagName, vertexName, vertexID,
         eventHandler, numTasks);
   }
-  
+
   public static class RootInputInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -1388,9 +1393,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       vertex.startTimeRequested = vertex.clock.getTime();
       vertex.startSignalPending = true;
     }
-    
+
   }
-  
+
   public static class StartTransition
   implements SingleArcTransition<VertexImpl, VertexEvent> {
     /**
@@ -1409,10 +1414,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     vertexScheduler.onVertexStarted(pendingReportedSrcCompletions);
     pendingReportedSrcCompletions.clear();
     logJobHistoryVertexStartedEvent();
-    
+
     // TODO: Metrics
     //job.metrics.runningJob(job);
-    
+
     // default behavior is to start immediately. so send information about us
     // starting to downstream vertices. If the connections/structure of this
     // vertex is not fully defined yet then we could send this event later
@@ -1557,7 +1562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + " attempt: " + completionEvent.getTaskAttemptId()
             + " with state: " + completionEvent.getTaskAttemptState()
             + " vertexState: " + vertex.getState());
-      
+
       if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent
           .getTaskAttemptState())) {
         vertex.numSuccessSourceAttemptCompletions++;
@@ -1675,25 +1680,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId()));
         return VertexState.RUNNING;
       }
-      
+
       LOG.info(vertex.getVertexId() + " failed due to post-commit rescheduling of "
           + ((VertexEventTaskReschedule)event).getTaskID());
       // terminate any running tasks
       vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE,
           TaskTerminationCause.OWN_TASK_FAILURE);
-      // since the DAG thinks this vertex is completed it must be notified of 
+      // since the DAG thinks this vertex is completed it must be notified of
       // an error
       vertex.eventHandler.handle(new DAGEvent(vertex.getDAGId(),
           DAGEventType.INTERNAL_ERROR));
       return VertexState.FAILED;
     }
   }
-  
+
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }
-  
-  private static boolean isEventFromVertex(Vertex vertex, 
+
+  private static boolean isEventFromVertex(Vertex vertex,
       EventMetaData sourceMeta) {
     if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
       return false;
@@ -1701,7 +1706,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return true;
   }
 
-  private static void checkEventSourceMetadata(Vertex vertex, 
+  private static void checkEventSourceMetadata(Vertex vertex,
       EventMetaData sourceMeta) {
     if (!isEventFromVertex(vertex, sourceMeta)) {
         throw new TezUncheckedException("Bad routing of event"
@@ -1754,7 +1759,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
               Edge destEdge = vertex.targetVertices.get(destVertex);
               if (destEdge == null) {
-                throw new TezUncheckedException("Bad destination vertex: " + 
+                throw new TezUncheckedException("Bad destination vertex: " +
                     sourceMeta.getEdgeVertexName() + " for event vertex: " +
                     vertex.getVertexId());
               }
@@ -1766,7 +1771,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
                   sourceMeta.getTaskVertexName()));
               if (srcEdge == null) {
-                throw new TezUncheckedException("Bad source vertex: " + 
+                throw new TezUncheckedException("Bad source vertex: " +
                     sourceMeta.getTaskVertexName() + " for destination vertex: " +
                     vertex.getVertexId());
               }
@@ -1830,7 +1835,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
     }
   }
-  
+
   private static class InternalErrorTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
@@ -1861,7 +1866,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         "For now, only a single root input can be specified on a Vertex");
     this.additionalInputs = Maps.newHashMapWithExpectedSize(inputs.size());
     for (RootInputLeafOutputProto input : inputs) {
-      
+
       InputDescriptor id = DagTypeConverters
           .convertInputDescriptorFromDAGPlan(input.getEntityDescriptor());
 
@@ -1872,7 +1877,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       InputSpec inputSpec = new InputSpec(input.getName(), id, 0);
       additionalInputSpecs.add(inputSpec);
     }
-    
+
   }
 
   @Override
@@ -1880,7 +1885,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     LOG.info("setting additional outputs for vertex " + this.vertexName);
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {
-      
+
       OutputDescriptor od = DagTypeConverters
           .convertOutputDescriptorFromDAGPlan(output.getEntityDescriptor());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 5299431..23f5c72 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -18,11 +18,20 @@
 
 package org.apache.tez.mapreduce.examples;
 
+import java.io.IOException;
 import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
 
 import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.mapreduce.examples.terasort.TeraGen;
 import org.apache.tez.mapreduce.examples.terasort.TeraSort;
 import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
@@ -85,7 +94,17 @@ public class ExampleDriver {
     System.exit(exitCode);
   }
 
-  public static void printMRRDAGStatus(DAGStatus dagStatus) {
+  public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
+      throws IOException, TezException {
+    printDAGStatus(dagClient, vertexNames, false, false);
+  }
+
+  public static void printDAGStatus(DAGClient dagClient, String[] vertexNames,
+      boolean displayDAGCounters, boolean displayVertexCounters)
+      throws IOException, TezException {
+    Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+    DAGStatus dagStatus = dagClient.getDAGStatus(
+      (displayDAGCounters ? opts : null));
     Progress progress = dagStatus.getDAGProgress();
     double vProgressFloat = 0.0f;
     if (progress != null) {
@@ -96,9 +115,10 @@ public class ExampleDriver {
           + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
             formatter.format((double)(progress.getSucceededTaskCount())
               /progress.getTotalTaskCount())));
-      final String[] vNames = { "initialmap", "ivertex1", "finalreduce" };
-      for (String vertexName : vNames) {
-        Progress vProgress = dagStatus.getVertexProgress().get(vertexName);
+      for (String vertexName : vertexNames) {
+        VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
+          (displayVertexCounters ? opts : null));
+        Progress vProgress = vStatus.getProgress();
         if (vProgress != null) {
           vProgressFloat = 0.0f;
           if (vProgress.getTotalTaskCount() == 0) {
@@ -113,6 +133,19 @@ public class ExampleDriver {
                   : vertexName)
               + " Progress: " + formatter.format(vProgressFloat));
         }
+        if (displayVertexCounters) {
+          TezCounters counters = vStatus.getVertexCounters();
+          if (counters != null) {
+            System.out.println("Vertex Counters for " + vertexName + ": "
+              + counters);
+          }
+        }
+      }
+    }
+    if (displayDAGCounters) {
+      TezCounters counters = dagStatus.getDAGCounters();
+      if (counters != null) {
+        System.out.println("DAG Counters: " + counters);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index bd032e2..a73b3fc 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -93,7 +93,7 @@ public class FilterLinesByWord {
     String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 
     boolean generateSplitsInClient = false;
-    
+
     SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
     try {
       generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
@@ -196,7 +196,7 @@ public class FilterLinesByWord {
     Map<String, String> stage1Env = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
     stage1Vertex.setTaskEnvironment(stage1Env);
-    
+
     // Configure the Input for stage1
     Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
         : MRInputAMSplitGenerator.class;
@@ -214,7 +214,7 @@ public class FilterLinesByWord {
     Map<String, String> stage2Env = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false);
     stage2Vertex.setTaskEnvironment(stage2Env);
-    
+
     // Configure the Output for stage2
     stage2Vertex.addOutput("MROutput",
         new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
@@ -233,9 +233,10 @@ public class FilterLinesByWord {
     LOG.info("Submitted DAG to Tez Session");
 
     DAGStatus dagStatus = null;
+    String[] vNames = { "stage1", "stage2" };
     try {
       while (true) {
-        dagStatus = dagClient.getDAGStatus();
+        dagStatus = dagClient.getDAGStatus(null);
         if(dagStatus.getState() == DAGStatus.State.RUNNING ||
             dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
             dagStatus.getState() == DAGStatus.State.FAILED ||
@@ -252,13 +253,13 @@ public class FilterLinesByWord {
 
       while (dagStatus.getState() == DAGStatus.State.RUNNING) {
         try {
-          ExampleDriver.printMRRDAGStatus(dagStatus);
+          ExampleDriver.printDAGStatus(dagClient, vNames);
           try {
             Thread.sleep(1000);
           } catch (InterruptedException e) {
             // continue;
           }
-          dagStatus = dagClient.getDAGStatus();
+          dagStatus = dagClient.getDAGStatus(null);
         } catch (TezException e) {
           LOG.fatal("Failed to get application progress. Exiting");
           System.exit(-1);
@@ -269,24 +270,24 @@ public class FilterLinesByWord {
       tezSession.stop();
     }
 
-    ExampleDriver.printMRRDAGStatus(dagStatus);
+    ExampleDriver.printDAGStatus(dagClient, vNames);
     LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
     System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
   }
-  
+
   public static class TextLongPair implements Writable {
 
     private Text text;
     private LongWritable longWritable;
-    
+
     public TextLongPair() {
     }
-    
+
     public TextLongPair(Text text, LongWritable longWritable) {
       this.text = text;
       this.longWritable = longWritable;
     }
-    
+
     @Override
     public void write(DataOutput out) throws IOException {
       this.text.write(out);
@@ -300,10 +301,10 @@ public class FilterLinesByWord {
       text.readFields(in);
       longWritable.readFields(in);
     }
-    
+
     @Override
     public String toString() {
       return text.toString() + "\t" + longWritable.get();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
index 29d6db5..60ce3da 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
@@ -197,9 +197,10 @@ public class GroupByOrderByMRRTest {
     ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
 
     DAGClient dagClient = tezClient.getDAGClient(appId);
-    DAGStatus dagStatus = null;
+    DAGStatus dagStatus;
+    String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
     while (true) {
-      dagStatus = dagClient.getDAGStatus();
+      dagStatus = dagClient.getDAGStatus(null);
       if(dagStatus.getState() == DAGStatus.State.RUNNING ||
          dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
          dagStatus.getState() == DAGStatus.State.FAILED ||
@@ -216,20 +217,20 @@ public class GroupByOrderByMRRTest {
 
     while (dagStatus.getState() == DAGStatus.State.RUNNING) {
       try {
-        ExampleDriver.printMRRDAGStatus(dagStatus);
+        ExampleDriver.printDAGStatus(dagClient, vNames);
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
           // continue;
         }
-        dagStatus = dagClient.getDAGStatus();
+        dagStatus = dagClient.getDAGStatus(null);
       } catch (TezException e) {
         LOG.fatal("Failed to get application progress. Exiting");
         System.exit(-1);
       }
     }
 
-    ExampleDriver.printMRRDAGStatus(dagStatus);
+    ExampleDriver.printDAGStatus(dagClient, vNames);
     LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
     System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 8943dfa..edea15b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -834,7 +834,7 @@ public class MRRSleepJob extends Configured implements Tool {
         tezClient.submitDAGApplication(appId, dag, amConfig);
 
     while (true) {
-      DAGStatus status = dagClient.getDAGStatus();
+      DAGStatus status = dagClient.getDAGStatus(null);
       LOG.info("DAG Status: " + status);
       if (status.isCompleted()) {
         break;


Mime
View raw message