tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [2/2] tez git commit: TEZ-2234. Add API for statistics information - allow vertex managers to get output size per source vertex (bikas)
Date Sat, 11 Apr 2015 05:52:55 GMT
TEZ-2234. Add API for statistics information - allow vertex managers to get output size per source vertex (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bd9b8d95
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bd9b8d95
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bd9b8d95

Branch: refs/heads/master
Commit: bd9b8d9516a127c98fc9b987fae84898e4207145
Parents: c8ef244
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Apr 10 22:52:11 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Apr 10 22:52:11 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tez/dag/api/VertexManagerPluginContext.java |  21 +++
 .../apache/tez/runtime/api/InputContext.java    |   7 +
 .../apache/tez/runtime/api/InputStatistics.java |  40 +++++
 .../runtime/api/InputStatisticsReporter.java    |  37 +++++
 .../apache/tez/runtime/api/OutputContext.java   |   7 +
 .../tez/runtime/api/OutputStatistics.java       |  41 +++++
 .../runtime/api/OutputStatisticsReporter.java   |  37 +++++
 .../tez/runtime/api/VertexStatistics.java       |  58 +++++++
 .../java/org/apache/tez/dag/app/dag/Task.java   |   1 -
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   5 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   8 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  21 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 128 ++++++++++++++-
 .../tez/dag/app/dag/impl/VertexManager.java     |  20 ++-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  11 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       | 156 +++++++++++++++++++
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   4 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  11 +-
 .../api/events/TaskStatusUpdateEvent.java       |  17 +-
 .../tez/runtime/api/impl/IOStatistics.java      |  30 ++++
 .../tez/runtime/api/impl/TaskStatistics.java    |  71 +++++++++
 .../runtime/api/impl/TezInputContextImpl.java   |  28 +++-
 .../runtime/api/impl/TezOutputContextImpl.java  |  26 +++-
 .../api/impl/TezProcessorContextImpl.java       |   4 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |   5 +-
 .../apache/tez/runtime/task/TaskReporter.java   |  18 ++-
 .../library/input/OrderedGroupedKVInput.java    |   5 +
 .../runtime/library/input/UnorderedKVInput.java |   5 +
 .../output/OrderedPartitionedKVOutput.java      |  12 +-
 .../library/output/UnorderedKVOutput.java       |  11 +-
 .../output/UnorderedPartitionedKVOutput.java    |  11 +-
 .../library/output/TestOnFileSortedOutput.java  |  17 +-
 .../output/TestOnFileUnorderedKVOutput.java     |  18 ++-
 34 files changed, 843 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d1127f..e12edaa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2234. Add API for statistics information - allow vertex managers to get
+  output size per source vertex
   TEZ-2274. Tez UI: full data loading, client side search and sort for other pages
   TEZ-2301. Switch Tez Pre-commit builds to use tezqa user.
   TEZ-2299. Invalid dag creation in MRRSleepJob post TEZ-2293.

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 38ecbf6..ab4ced0 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 
 import com.google.common.base.Preconditions;
@@ -60,6 +61,7 @@ public interface VertexManagerPluginContext {
       return locationHint;
     }
   }
+  
   /**
    * Get the edge properties on the input edges of this vertex. The input edge 
    * is represented by the source vertex name
@@ -68,6 +70,25 @@ public interface VertexManagerPluginContext {
   public Map<String, EdgeProperty> getInputVertexEdgeProperties();
   
   /**
+   * Get the edge properties on the output edges of this vertex. The output edge 
+   * is represented by the destination vertex name
+   * @return Map of destination vertex name and edge property
+   */
+  public Map<String, EdgeProperty> getOutputVertexEdgeProperties();
+  
+  /**
+   * Get a {@link VertexStatistics} object to find out execution statistics
+   * about the given {@link Vertex}.
+   * <br>This only provides point in time values for the statistics and must be
+   * called again to get updated values.
+   * 
+   * @param vertexName
+   *          Name of the {@link Vertex}
+   * @return {@link VertexStatistics} for the given vertex
+   */
+  public VertexStatistics getVertexStatistics(String vertexName);
+
+  /**
    * Get the name of the vertex
    * @return Vertex name
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
index fc02878..479a7db 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
@@ -46,4 +46,11 @@ public interface InputContext extends TaskContext {
    * This method can be invoked multiple times.
    */
   public void inputIsReady();
+  
+  /**
+   * Get an {@link InputStatisticsReporter} for this {@link Input} that can
+   * be used to report statistics like data size
+   * @return {@link InputStatisticsReporter}
+   */
+  public InputStatisticsReporter getStatisticsReporter();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
new file mode 100644
index 0000000..fb99f2d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.Vertex;
+
+/**
+ * Provides various statistics about physical execution activity happening on a
+ * logical input in a {@link Vertex}. Inputs can be external inputs or inputs
+ * from other vertices.
+ */
+@Public
+@Evolving
+public interface InputStatistics {
+  
+  /**
+   * Returns the data size associated with this logical input
+   * <br>It is the size of the data read from this input by the vertex.
+   * @return Data size in bytes
+   */
+ public long getDataSize();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
new file mode 100644
index 0000000..68a56e7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Report statistics about the {@link Input}
+ */
+@Public
+@Evolving
+public interface InputStatisticsReporter {
+
+  /**
+   * Report the size of the logical data read
+   * @param size of data in bytes
+   */
+  public void reportDataSize(long size);
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
index a0c7194..882eb4b 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
@@ -40,5 +40,12 @@ public interface OutputContext extends TaskContext {
    * @return index
    */
   public int getOutputIndex();
+  
+  /**
+   * Get an {@link OutputStatisticsReporter} for this {@link Output} that can
+   * be used to report statistics like data size
+   * @return {@link OutputStatisticsReporter}
+   */
+  public OutputStatisticsReporter getStatisticsReporter();
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
new file mode 100644
index 0000000..0373606
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.api;
+
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.Vertex;
+
+/**
+ * Provides various statistics about physical execution activity happening on a
+ * logical output in a {@link Vertex}. Outputs can be external outputs or
+ * outputs to other vertices.
+ */
+@Public
+@Evolving
+public interface OutputStatistics {
+  
+  /**
+   * Returns the data size associated with this logical output
+   * <br>It is the size of the data written to this output by the vertex.
+   * @return Data size in bytes
+   */
+ public long getDataSize();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
new file mode 100644
index 0000000..fc9f1b7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Report statistics about the {@link Output}
+ */
+@Public
+@Evolving
+public interface OutputStatisticsReporter {
+
+  /**
+   * Report the size of the logical data written
+   * @param size of the data in bytes
+   */
+  public void reportDataSize(long size);
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java
new file mode 100644
index 0000000..aa526f2
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexStatistics.java
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.Vertex;
+
+/**
+ * Provides various statistics about the physical execution of this
+ * {@link Vertex}<br>
+ * This only provides point in time values for the statistics and values are
+ * refreshed based on when the implementations of the inputs/outputs/tasks etc.
+ * update their reported statistics. The values may increase or decrease based
+ * on task completions or failures.
+ */
+@Public
+@Evolving
+public interface VertexStatistics {
+
+  /**
+   * Get statistics about an {@link Edge} input or external input of this
+   * {@link Vertex}. <br>
+   * 
+   * @param inputName
+   *          Name of the input {@link Edge} or external input of this vertex
+   * @return {@link InputStatistics} for the given input
+   */
+  public InputStatistics getInputStatistics(String inputName);
+
+  /**
+   * Get statistics about an {@link Edge} output or external output of this
+   * {@link Vertex}. <br>
+   * 
+   * @param outputName
+   *          Name of the output {@link Edge} or external output of this vertex
+   * @return {@link OutputStatistics} for the given output
+   */
+  public OutputStatistics getOutputStatistics(String outputName);
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 98a85cf..b798fce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -41,7 +41,6 @@ public interface Task {
   Map<TezTaskAttemptID, TaskAttempt> getAttempts();
   TaskAttempt getAttempt(TezTaskAttemptID attemptID);
   TaskAttempt getSuccessfulAttempt();
-
   /** Has Task reached the final state or not.
    */
   boolean isFinished();

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 44df6cb..6c85b85 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.dag;
 
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +53,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -66,6 +68,7 @@ public interface Vertex extends Comparable<Vertex> {
   public VertexPlan getVertexPlan();
 
   int getDistanceFromRoot();
+  LinkedHashMap<String, Integer> getIOIndices();
   String getName();
   VertexState getState();
 
@@ -107,6 +110,8 @@ public interface Vertex extends Comparable<Vertex> {
   void setInputVertices(Map<Vertex, Edge> inVertices);
   void setOutputVertices(Map<Vertex, Edge> outVertices);
 
+  VertexStatistics getStatistics();
+  
   Map<Vertex, Edge> getInputVertices();
   Map<Vertex, Edge> getOutputVertices();
   

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 4f92fa6..1af4274 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -103,6 +103,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
@@ -147,6 +148,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   @VisibleForTesting
   TaskAttemptStatus reportedStatus;
   private DAGCounter localityCounter;
+  
+  org.apache.tez.runtime.api.impl.TaskStatistics statistics;
 
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
@@ -517,6 +520,10 @@ public class TaskAttemptImpl implements TaskAttempt,
       readLock.unlock();
     }
   }
+  
+  TaskStatistics getStatistics() {
+    return this.statistics;
+  }
 
   @Override
   public float getProgress() {
@@ -1259,6 +1266,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.reportedStatus.state = ta.getState();
       ta.reportedStatus.progress = statusEvent.getProgress();
       ta.reportedStatus.counters = statusEvent.getCounters();
+      ta.statistics = statusEvent.getStatistics();
 
       ta.updateProgressSplits();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 8290cf5..10a688f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -92,6 +92,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -140,7 +141,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private boolean historyTaskStartGenerated = false;
 
-  private static final SingleArcTransition<TaskImpl, TaskEvent>
+  private static final SingleArcTransition<TaskImpl , TaskEvent>
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
@@ -149,7 +150,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   boolean recoveryStartEventSeen = false;
 
   private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
-
+  
   private static final StateMachineFactory
                <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
             stateMachineFactory
@@ -349,7 +350,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         stateMachineFactory.make(this), this);
     augmentStateMachine();
   }
-
+  
   @Override
   public Map<TezTaskAttemptID, TaskAttempt> getAttempts() {
     readLock.lock();
@@ -433,6 +434,20 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       readLock.unlock();
     }
   }
+  
+  TaskStatistics getStatistics() {
+    // simply return the stats from the best attempt
+    readLock.lock();
+    try {
+      TaskAttemptImpl bestAttempt = (TaskAttemptImpl) selectBestAttempt();
+      if (bestAttempt == null) {
+        return null;
+      }
+      return bestAttempt.getStatistics();
+    } finally {
+      readLock.unlock();
+    }
+  }
 
   @Override
   public float getProgress() {

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 2d892b0..82f7a33 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -150,9 +150,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
+import org.apache.tez.runtime.api.InputStatistics;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.OutputStatistics;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
@@ -166,6 +169,7 @@ import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -660,6 +664,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   Map<Vertex, Edge> sourceVertices;
   private Map<Vertex, Edge> targetVertices;
   Set<Edge> uninitializedEdges = Sets.newHashSet();
+  // using a linked hash map to conveniently map edge names to a contiguous index
+  LinkedHashMap<String, Integer> ioIndices = Maps.newLinkedHashMap();
 
   private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
     rootInputDescriptors;
@@ -718,6 +724,62 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
 
+  private VertexStatisticsImpl finalStatistics;
+
+  
+  static class IOStatisticsImpl extends org.apache.tez.runtime.api.impl.IOStatistics 
+    implements InputStatistics, OutputStatistics {
+    
+    @Override
+    public long getDataSize() {
+      return super.getDataSize();
+    }
+    
+    void mergeFrom(org.apache.tez.runtime.api.impl.IOStatistics other) {
+      this.setDataSize(this.getDataSize() + other.getDataSize());
+    }
+    
+  }
+
+  class VertexStatisticsImpl implements VertexStatistics {
+    final Map<String, IOStatisticsImpl> ioStats;
+    
+    public VertexStatisticsImpl() {
+      ioStats = Maps.newHashMapWithExpectedSize(ioIndices.size());
+      for (String name : ioIndices.keySet()) {
+        ioStats.put(name, new IOStatisticsImpl());
+      }
+    }
+    
+    public IOStatisticsImpl getIOStatistics(String ioName) {
+      return ioStats.get(ioName);
+    }
+    
+    void mergeFrom(TaskStatistics taskStats) {
+      if (taskStats == null) {
+        return;
+      }
+      
+      for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats.getIOStatistics().entrySet()) {
+        String edgeName = entry.getKey();
+        IOStatisticsImpl myEdgeStat = ioStats.get(edgeName);
+        Preconditions.checkState(myEdgeStat != null, "Unexpected IO name: " + edgeName
+            + " for vertex:" + getLogIdentifier());
+        myEdgeStat.mergeFrom(entry.getValue());
+      }
+    }
+
+    @Override
+    public InputStatistics getInputStatistics(String inputName) {
+      return getIOStatistics(inputName);
+    }
+
+    @Override
+    public OutputStatistics getOutputStatistics(String outputName) {
+      return getIOStatistics(outputName);
+    }
+  }
+  
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration dagConf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Clock clock,
@@ -855,6 +917,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public int getDistanceFromRoot() {
     return distanceFromRoot;
   }
+  
+  @Override
+  public LinkedHashMap<String, Integer> getIOIndices() {
+    return ioIndices;
+  }
 
   @Override
   public String getName() {
@@ -931,9 +998,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     readLock.lock();
 
     try {
-      VertexState state = getInternalState();
-      if (state == VertexState.ERROR || state == VertexState.FAILED
-          || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+      if (inTerminalState()) {
         this.mayBeConstructFinalFullCounters();
         return fullCounters;
       }
@@ -950,9 +1015,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     readLock.lock();
     try {
-      VertexState state = getInternalState();
-      if (state == VertexState.ERROR || state == VertexState.FAILED
-          || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+      if (inTerminalState()) {
         this.mayBeConstructFinalFullCounters();
         return this.vertexStats;
       }
@@ -965,6 +1028,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  boolean inTerminalState() {
+    VertexState state = getInternalState();
+    if (state == VertexState.ERROR || state == VertexState.FAILED
+        || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+      return true;
+    }
+    return false;
+  }
 
   public static TezCounters incrTaskCounters(
       TezCounters counters, Collection<Task> tasks) {
@@ -1705,6 +1776,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
 
     if (vertex.completedTaskCount == vertex.tasks.size()) {
+      // finished - gather stats
+      vertex.finalStatistics = vertex.constructStatistics();
+      
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
@@ -2221,7 +2295,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         return VertexState.FAILED;
       }
     }
-
+    
     checkTaskLimits();
     return VertexState.INITED;
   }
@@ -3403,6 +3477,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private void mayBeConstructFinalFullCounters() {
     // Calculating full-counters. This should happen only once for the vertex.
     synchronized (this.fullCountersLock) {
+      // TODO this is broken after rerun
       if (this.fullCounters != null) {
         // Already constructed. Just return.
         return;
@@ -3410,6 +3485,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       this.constructFinalFullcounters();
     }
   }
+  
+  private VertexStatisticsImpl constructStatistics() {
+    VertexStatisticsImpl stats = new VertexStatisticsImpl();
+    for (Task t : this.tasks.values()) {
+      TaskStatistics  taskStats = ((TaskImpl)t).getStatistics();
+      stats.mergeFrom(taskStats);
+    }
+    
+    return stats;
+  }
 
   @Private
   public void constructFinalFullcounters() {
@@ -3710,6 +3795,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         (new TaskRescheduledTransition()).transition(vertex, event);
         // inform the DAG that we are re-running
         vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId()));
+        // back to running. so reset final cached stats
+        vertex.finalStatistics = null;
         return VertexState.RUNNING;
       }
 
@@ -4049,18 +4136,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   @Override
   public void setInputVertices(Map<Vertex, Edge> inVertices) {
     this.sourceVertices = inVertices;
+    for (Vertex vertex : sourceVertices.keySet()) {
+      addIO(vertex.getName());
+    }
   }
 
   @Override
   public void setOutputVertices(Map<Vertex, Edge> outVertices) {
     this.targetVertices = outVertices;
+    for (Vertex vertex : targetVertices.keySet()) {
+      addIO(vertex.getName());
+    }
   }
 
   @Override
   public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
     this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size());
     for (RootInputLeafOutputProto input : inputs) {
-
+      addIO(input.getName());
       InputDescriptor id = DagTypeConverters
           .convertInputDescriptorFromDAGPlan(input.getIODescriptor());
 
@@ -4106,6 +4199,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {
+      addIO(output.getName());
       OutputDescriptor od = DagTypeConverters
           .convertOutputDescriptorFromDAGPlan(output.getIODescriptor());
 
@@ -4181,6 +4275,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public Map<Vertex, Edge> getOutputVertices() {
     return Collections.unmodifiableMap(this.targetVertices);
   }
+  
+  @Override
+  public VertexStatistics getStatistics() {
+    readLock.lock();
+    try {
+      if (inTerminalState()) {
+        Preconditions.checkState(this.finalStatistics != null);
+        return this.finalStatistics;
+      }
+      return constructStatistics();
+    } finally {
+      readLock.unlock();
+    }
+  }
 
   @Override
   public int getInputVerticesCount() {
@@ -4214,6 +4322,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       readLock.unlock();
     }
   }
+  
+  void addIO(String name) {
+    ioIndices.put(StringInterner.weakIntern(name), ioIndices.size());
+  }
 
   @VisibleForTesting
   String getProcessorName() {

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 4bf51a1..0be0aaa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -122,7 +123,6 @@ public class VertexManager {
     @Override
     public synchronized Map<String, EdgeProperty> getInputVertexEdgeProperties() {
       checkAndThrowIfDone();
-      // TODO Something similar for Initial Inputs - payload etc visible
       Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
       Map<String, EdgeProperty> vertexEdgeMap =
                           Maps.newHashMapWithExpectedSize(inputs.size());
@@ -133,6 +133,24 @@ public class VertexManager {
     }
 
     @Override
+    public synchronized Map<String, EdgeProperty> getOutputVertexEdgeProperties() {
+      checkAndThrowIfDone();
+      Map<Vertex, Edge> outputs = managedVertex.getOutputVertices();
+      Map<String, EdgeProperty> vertexEdgeMap =
+                          Maps.newHashMapWithExpectedSize(outputs.size());
+      for (Map.Entry<Vertex, Edge> entry : outputs.entrySet()) {
+        vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
+      }
+      return vertexEdgeMap;
+    }
+    
+    @Override
+    public synchronized VertexStatistics getVertexStatistics(String vertexName) {
+      checkAndThrowIfDone();
+      return appContext.getCurrentDAG().getVertex(vertexName).getStatistics();      
+    }
+
+    @Override
     public synchronized String getVertexName() {
       checkAndThrowIfDone();
       return managedVertex.getName();

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 3390b02..fca15fd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -67,6 +67,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
@@ -91,6 +92,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   boolean startFailFlag;
   boolean sendDMEvents;
   CountersDelegate countersDelegate;
+  StatisticsDelegate statsDelegate;
   long launcherSleepTime = 1;
   boolean doSleep = true;
   int handlerConcurrency = 1;
@@ -101,6 +103,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
   AtomicLong heartbeatTime = new AtomicLong(0);
   AtomicLong numHearbeats = new AtomicLong(0);
   
+  public static interface StatisticsDelegate {
+    public TaskStatistics getStatistics(TaskSpec taskSpec);
+  }
   public static interface CountersDelegate {
     public TezCounters getCounters(TaskSpec taskSpec);
   }
@@ -402,10 +407,14 @@ public class MockDAGAppMaster extends DAGAppMaster {
               if (countersDelegate != null) {
                  counters = countersDelegate.getCounters(cData.taskSpec);
               }
+              TaskStatistics stats = null;
+              if (statsDelegate != null) {
+                stats = statsDelegate.getStatistics(cData.taskSpec);
+              }
               cData.numUpdates++;
               float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
               float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
-              events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress), new EventMetaData(
+              events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
                   EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
               TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
                   cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 1b8acab..8be60c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -43,10 +43,12 @@ import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -59,7 +61,9 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
+import org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
@@ -68,10 +72,13 @@ import org.apache.tez.dag.app.dag.impl.VertexImpl;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.IOStatistics;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -332,6 +339,91 @@ public class TestMockDAGAppMaster {
     tezClient.stop();
   }
   
+  @Test (timeout = 10000)
+  public void testBasicStatistics() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+        null, false, false);
+    tezClient.start();
+
+    final String vAName = "A";
+    final String vBName = "B";
+    final String sourceName = "In";
+    final String sinkName = "Out";
+    DAG dag = DAG.create("testBasisStatistics");
+    Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 3);
+    Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 2);
+    vA.addDataSource(sourceName,
+        DataSourceDescriptor.create(InputDescriptor.create("In"), null, null));
+    vB.addDataSink(sinkName, DataSinkDescriptor.create(OutputDescriptor.create("Out"), null, null));
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addEdge(
+            Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+    IOStatistics ioStats = new IOStatistics();
+    ioStats.setDataSize(1);
+    TaskStatistics vAStats = new TaskStatistics();
+    vAStats.addIO(vBName, ioStats);
+    vAStats.addIO(sourceName, ioStats);
+    TaskStatistics vBStats = new TaskStatistics();
+    vBStats.addIO(vAName, ioStats);
+    vBStats.addIO(sinkName, ioStats);
+    ByteArrayOutputStream bosA = new ByteArrayOutputStream();
+    DataOutput outA = new DataOutputStream(bosA);
+    vAStats.write(outA);
+    final byte[] payloadA = bosA.toByteArray();
+    ByteArrayOutputStream bosB = new ByteArrayOutputStream();
+    DataOutput outB = new DataOutputStream(bosB);
+    vBStats.write(outB);
+    final byte[] payloadB = bosB.toByteArray();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.statsDelegate = new StatisticsDelegate() {
+      @Override
+      public TaskStatistics getStatistics(TaskSpec taskSpec) {
+        byte[] payload = payloadA;
+        TaskStatistics stats = new TaskStatistics();
+        if (taskSpec.getVertexName().equals(vBName)) {
+          payload = payloadB;
+        }
+        final DataInputByteBuffer in = new DataInputByteBuffer();
+        in.reset(ByteBuffer.wrap(payload));
+        try {
+          // this ensures that the serde code path is covered.
+          stats.readFields(in);
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
+        return stats;
+      }
+    };
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    
+    // verify that the values have been correct aggregated
+    for (org.apache.tez.dag.app.dag.Vertex v : dagImpl.getVertices().values()) {
+      VertexStatistics vStats = v.getStatistics();
+      if (v.getName().equals(vAName)) {
+        Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getDataSize());
+        Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize());        
+      } else {
+        Assert.assertEquals(2, vStats.getInputStatistics(vAName).getDataSize());
+        Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize());        
+      }
+    }
+    
+    tezClient.stop();
+  }
+  
   private void checkMemory(String name, MockDAGAppMaster mockApp) {                
     long mb = 1024*1024;                                                           
                                                                                    
@@ -401,6 +493,70 @@ public class TestMockDAGAppMaster {
     checkMemory(dag.getName(), mockApp);
     tezClient.stop();
   }
+
+  @Ignore
+  @Test (timeout = 60000)
+  public void testBasicStatisticsMemory() throws Exception {
+    Logger.getRootLogger().setLevel(Level.WARN);
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+        null, false, false);
+    tezClient.start();
+
+    final String vAName = "abcdefghijklmnopqrstuvwxyz";
+    int numTasks = 10000;
+    int numSources = 10;
+
+    IOStatistics ioStats = new IOStatistics();
+    ioStats.setDataSize(1);
+    TaskStatistics vAStats = new TaskStatistics();
+
+    DAG dag = DAG.create("testBasisStatistics");
+    Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), numTasks);
+    for (int i=0; i<numSources; ++i) {
+      final String sourceName = i + vAName;
+      vA.addDataSource(sourceName,
+          DataSourceDescriptor.create(InputDescriptor.create(sourceName), null, null));
+      vAStats.addIO(sourceName, ioStats);
+    }
+    dag.addVertex(vA);
+
+    ByteArrayOutputStream bosA = new ByteArrayOutputStream();
+    DataOutput outA = new DataOutputStream(bosA);
+    vAStats.write(outA);
+    final byte[] payloadA = bosA.toByteArray();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.statsDelegate = new StatisticsDelegate() {
+      @Override
+      public TaskStatistics getStatistics(TaskSpec taskSpec) {
+        byte[] payload = payloadA;
+        TaskStatistics stats = new TaskStatistics();
+        final DataInputByteBuffer in = new DataInputByteBuffer();
+        in.reset(ByteBuffer.wrap(payload));
+        try {
+          // this ensures that the serde code path is covered.
+          stats.readFields(in);
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
+        return stats;
+      }
+    };
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    Assert.assertEquals(numTasks,
+        dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getDataSize());
+    checkMemory(dag.getName(), mockApp);
+    tezClient.stop();
+  }
   
   @Test (timeout = 10000)
   public void testMultipleSubmissions() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 224976c..2a2df7c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -630,7 +630,7 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(0,
             expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
     
-    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)));
     
     taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0",
         TaskAttemptTerminationCause.APPLICATION_ERROR));
@@ -732,7 +732,7 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(0,
             expectedEventsAtRunning), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
     
-    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null)));
     
     taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 56b2627..0b048da 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -27,7 +27,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -45,7 +44,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.RunnableWithNdc;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -75,6 +73,7 @@ import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezInputContextImpl;
 import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
@@ -86,7 +85,6 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -119,6 +117,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   /** Maps which will be provided to the processor run method */
   private final LinkedHashMap<String, LogicalInput> runInputMap;
   private final LinkedHashMap<String, LogicalOutput> runOutputMap;
+  
+  private final TaskStatistics statistics;
 
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final Map<String, String> envMap;
@@ -182,6 +182,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.objectRegistry = objectRegistry;
     this.ExecutionContext = ExecutionContext;
     this.memAvailable = memAvailable;
+    this.statistics = new TaskStatistics();
   }
 
   /**
@@ -324,6 +325,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     startRouterThread();
   }
+  
+  public TaskStatistics getTaskStatistics() {
+    return statistics;
+  }
 
   public void run() throws Exception {
     Preconditions.checkState(this.state.get() == State.INITED,

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
index 47c2998..875a345 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -25,18 +25,21 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.TaskStatistics;
 
 public class TaskStatusUpdateEvent extends Event implements Writable {
 
   private TezCounters tezCounters;
   private float progress;
+  private TaskStatistics statistics;
 
   public TaskStatusUpdateEvent() {
   }
 
-  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
+  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress, TaskStatistics statistics) {
     this.tezCounters = tezCounters;
     this.progress = progress;
+    this.statistics = statistics;
   }
 
   public TezCounters getCounters() {
@@ -46,6 +49,10 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
   public float getProgress() {
     return progress;
   }
+  
+  public TaskStatistics getStatistics() {
+    return statistics;
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {
@@ -56,6 +63,10 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
     } else {
       out.writeBoolean(false);
     }
+    if (statistics != null) {
+      out.writeBoolean(true);
+      statistics.write(out);
+    }
   }
 
   @Override
@@ -65,6 +76,10 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
       tezCounters = new TezCounters();
       tezCounters.readFields(in);
     }
+    if (in.readBoolean()) {
+      statistics = new TaskStatistics();
+      statistics.readFields(in);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
new file mode 100644
index 0000000..ede9205
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+public class IOStatistics {
+  private long dataSize = 0;
+  
+  public void setDataSize(long size) {
+    this.dataSize = size;
+  }
+  
+  public long getDataSize() {
+    return dataSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
new file mode 100644
index 0000000..b50d099
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringInterner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class TaskStatistics implements Writable {
+  // The memory usage of this is minimal (<10MB for 10K tasks x 10 inputs)
+  // TestMockDAGAppMaster#testBasicStatisticsMemory
+  private Map<String, IOStatistics> ioStatistics = Maps.newConcurrentMap();
+
+  public void addIO(String edgeName) {
+    addIO(edgeName, new IOStatistics());
+  }
+  
+  public void addIO(String edgeName, IOStatistics stats) {
+    Preconditions.checkArgument(stats != null, edgeName);
+    ioStatistics.put(StringInterner.weakIntern(edgeName), stats);    
+  }
+  
+  public Map<String, IOStatistics> getIOStatistics() {
+    return ioStatistics;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int numEntries = ioStatistics.size();
+    out.writeInt(numEntries);
+    for (Map.Entry<String, IOStatistics> entry : ioStatistics.entrySet()) {
+      IOStatistics edgeStats = entry.getValue();
+      Text.writeString(out, entry.getKey());
+      out.writeLong(edgeStats.getDataSize());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEntries = in.readInt();
+    for (int i=0; i<numEntries; ++i) {
+      String edgeName = Text.readString(in);
+      IOStatistics edgeStats = new IOStatistics();
+      edgeStats.setDataSize(in.readLong());
+      addIO(edgeName, edgeStats);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index bd41aed..a9b7eab 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -37,9 +37,10 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
-import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputStatisticsReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.ObjectRegistry;
@@ -55,6 +56,18 @@ public class TezInputContextImpl extends TezTaskContextImpl
   private final int inputIndex;
   private final Map<String, LogicalInput> inputs;
   private final InputReadyTracker inputReadyTracker;
+  private final InputStatisticsReporterImpl statsReporter;
+  
+  class InputStatisticsReporterImpl implements InputStatisticsReporter {
+
+    @Override
+    public synchronized void reportDataSize(long size) {
+      // this is a concurrent map. Plus we are not adding/deleting entries
+      runtimeTask.getTaskStatistics().getIOStatistics().get(sourceVertexName)
+          .setDataSize(size);
+    }
+    
+  }
 
   @Private
   public TezInputContextImpl(Configuration conf, String[] workDirs,
@@ -63,7 +76,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
                              String taskVertexName, String sourceVertexName,
                              int vertexParallelism, TezTaskAttemptID taskAttemptID,
                              int inputIndex, @Nullable UserPayload userPayload,
-                             RuntimeTask runtimeTask,
+                             LogicalIOProcessorRuntimeTask runtimeTask,
                              Map<String, ByteBuffer> serviceConsumerMetadata,
                              Map<String, String> auxServiceEnv, MemoryDistributor memDist,
                              InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs,
@@ -86,9 +99,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
         taskAttemptID);
     this.inputs = inputs;
     this.inputReadyTracker = inputReadyTracker;
+    runtimeTask.getTaskStatistics().addIO(sourceVertexName);
+    statsReporter = new InputStatisticsReporterImpl();
   }
 
-  private static TezCounters wrapCounters(RuntimeTask task, String taskVertexName,
+  private static TezCounters wrapCounters(LogicalIOProcessorRuntimeTask task, String taskVertexName,
       String edgeVertexName, Configuration conf) {
     TezCounters tezCounters = task.addAndGetTezCounter(edgeVertexName);
     if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
@@ -134,4 +149,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public void inputIsReady() {
     inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName));
   }
-}
\ No newline at end of file
+
+  @Override
+  public InputStatisticsReporter getStatisticsReporter() {
+    return statsReporter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 8d758f0..17513dd 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -36,11 +36,12 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
@@ -51,6 +52,18 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
   private final int outputIndex;
+  private final OutputStatisticsReporterImpl statsReporter;
+  
+  class OutputStatisticsReporterImpl implements OutputStatisticsReporter {
+
+    @Override
+    public synchronized void reportDataSize(long size) {
+      // this is a concurrent map. Plus we are not adding/deleting entries
+      runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName)
+      .setDataSize(size);
+    }
+    
+  }
 
   @Private
   public TezOutputContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
@@ -59,7 +72,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       String destinationVertexName,
       int vertexParallelism,
       TezTaskAttemptID taskAttemptID, int outputIndex,
-      @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
+      @Nullable UserPayload userPayload, LogicalIOProcessorRuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry,
@@ -76,9 +89,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
         taskVertexName, destinationVertexName, taskAttemptID);
+    runtimeTask.getTaskStatistics().addIO(destinationVertexName);
+    statsReporter = new OutputStatisticsReporterImpl();
   }
 
-  private static TezCounters wrapCounters(RuntimeTask runtimeTask, String taskVertexName,
+  private static TezCounters wrapCounters(LogicalIOProcessorRuntimeTask runtimeTask, String taskVertexName,
       String edgeVertexName, Configuration conf) {
     TezCounters tezCounters = runtimeTask.addAndGetTezCounter(edgeVertexName);
     if (conf.getBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO,
@@ -119,4 +134,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   public int getOutputIndex() {
     return outputIndex;
   }
+
+  @Override
+  public OutputStatisticsReporter getStatisticsReporter() {
+    return statsReporter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index edfd8c9..a74ccac 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -37,7 +37,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
-import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
@@ -55,7 +55,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
   public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
       TezUmbilical tezUmbilical, String dagName, String vertexName,
       int vertexParallelism, TezTaskAttemptID taskAttemptID,
-      @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
+      @Nullable UserPayload userPayload, LogicalIOProcessorRuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 527b822..6c0a869 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EntityDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -53,7 +54,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
   private final TezCounters counters;
   private String[] workDirs;
   private String uniqueIdentifier;
-  protected final RuntimeTask runtimeTask;
+  protected final LogicalIOProcessorRuntimeTask runtimeTask;
   protected final TezUmbilical tezUmbilical;
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final int appAttemptNumber;
@@ -69,7 +70,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
   @Private
   public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
       String dagName, String taskVertexName, int vertexParallelism, 
-      TezTaskAttemptID taskAttemptID, TezCounters counters, RuntimeTask runtimeTask,
+      TezTaskAttemptID taskAttemptID, TezCounters counters, LogicalIOProcessorRuntimeTask runtimeTask,
       TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry,

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 4c07f5a..48be8bd 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -223,7 +223,7 @@ public class TaskReporter {
       eventsToSend.drainTo(events);
 
       if (!task.isTaskDone() && !task.hadFatalError()) {
-        TezCounters counters = null;
+        boolean sendCounters = false;
         /**
          * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
          * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
@@ -231,11 +231,10 @@ public class TaskReporter {
          */
         // Not completely accurate, since OOB heartbeats could go out.
         if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
-          counters = task.getCounters();
+          sendCounters = true;
           prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
         }
-        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
-            updateEventMetadata);
+        updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata);
         events.add(updateEvent);
       }
 
@@ -313,12 +312,16 @@ public class TaskReporter {
      *           indicates an exception somewhere in the AM.
      */
     private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
+      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
       TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
           updateEventMetadata);
       return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
     }
+    
+    private TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
+      return new TaskStatusUpdateEvent((sendCounters ? task.getCounters() : null),
+          task.getProgress(), task.getTaskStatistics());
+    }
 
     /**
      * Sends out final events for task failure.
@@ -334,8 +337,7 @@ public class TaskReporter {
      */
     private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
         EventMetaData srcMeta) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
+      TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
       if (diagnostics == null) {
         diagnostics = ExceptionUtils.getStackTrace(t);
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index a807bad..d2d9ed8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -185,6 +185,11 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     if (shuffle != null) {
       shuffle.shutdown();
     }
+    
+    long outputSize = getContext().getCounters()
+        .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    
     return Collections.emptyList();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 6ad58ba..f2f47e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -208,6 +208,11 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     if (this.shuffleManager != null) {
       this.shuffleManager.shutdown();
     }
+    
+    long outputSize = getContext().getCounters()
+        .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 518d214..b03c674 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
@@ -159,16 +161,22 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
 
   @Override
   public synchronized List<Event> close() throws IOException {
+    List<Event> returnEvents = null;
     if (sorter != null) {
       sorter.flush();
       sorter.close();
       this.endTime = System.nanoTime();
-      return generateEvents();
+      returnEvents = generateEvents();
     } else {
       LOG.warn("Attempting to close output " + getContext().getDestinationVertexName()
           + " before it was started");
-      return Collections.emptyList();
+      returnEvents = Collections.emptyList();
     }
+    
+    long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    
+    return returnEvents;
   }
 
   private List<Event> generateEvents() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 9914735..6c9077e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -120,12 +121,18 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
 
   @Override
   public synchronized List<Event> close() throws Exception {
+    List<Event> returnEvents = null;
     if (isStarted.get()) {
       //TODO: Do we need to support sending payloads via events?
-      return kvWriter.close();
+      returnEvents = kvWriter.close();
     } else {
-      return Collections.emptyList();
+      returnEvents = Collections.emptyList();
     }
+    
+    long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    
+    return returnEvents;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index af66c62..8b83f9b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
@@ -97,11 +98,17 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
 
   @Override
   public synchronized List<Event> close() throws Exception {
+    List<Event> returnEvents = null;
     if (isStarted.get()) {
-      return kvWriter.close();
+      returnEvents = kvWriter.close();
     } else {
-      return Collections.emptyList();
+      returnEvents = Collections.emptyList();
     }
+
+    long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    
+    return returnEvents;
   }
 
   private static final Set<String> confKeys = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/tez/blob/bd9b8d95/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index cfe1e6f..8e43f21 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.output;
 
 import com.google.protobuf.ByteString;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -56,6 +58,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -82,6 +85,8 @@ public class TestOnFileSortedOutput {
   private int partitions;
   //For sorter (pipelined / Default)
   private int sorterThreads;
+  
+  final AtomicLong outputSize = new AtomicLong();
 
   private KeyValuesWriter writer;
   private OrderedPartitionedKVOutput sortedOutput;
@@ -123,6 +128,7 @@ public class TestOnFileSortedOutput {
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
         sendEmptyPartitionViaEvent);
 
+    outputSize.set(0);
     fs.mkdirs(workingDir);
     this.partitions = Math.max(1, rnd.nextInt(10));
   }
@@ -335,7 +341,15 @@ public class TestOnFileSortedOutput {
     serviceProviderMetaData.writeInt(PORT);
 
     TezCounters counters = new TezCounters();
-
+    
+    OutputStatisticsReporter reporter = mock(OutputStatisticsReporter.class);
+    doAnswer(new Answer() {
+      @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+        outputSize.set((Long) invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(reporter).reportDataSize(anyLong());
+    
     OutputContext context = mock(OutputContext.class);
     doReturn(counters).when(context).getCounters();
     doReturn(workingDirs).when(context).getWorkDirs();
@@ -357,6 +371,7 @@ public class TestOnFileSortedOutput {
     }).when(context).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
     ExecutionContext ExecutionContext = mock(ExecutionContext.class);
     doReturn(HOST).when(ExecutionContext).getHostName();
+    doReturn(reporter).when(context).getStatisticsReporter();
     doReturn(ExecutionContext).when(context).getExecutionContext();
     return context;
   }


Mime
View raw message