tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [22/50] [abbrv] tez git commit: TEZ-2308. Add set/get of record counts in task/vertex statistics (bikas)
Date Fri, 24 Apr 2015 00:26:16 GMT
TEZ-2308. Add set/get of record counts in task/vertex statistics (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f46997a7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f46997a7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f46997a7

Branch: refs/heads/TEZ-2003
Commit: f46997a7cbc5474e3368c852f31a95c97da3b6a4
Parents: 87aac12
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 21 14:01:59 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Apr 21 14:01:59 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/runtime/api/InputStatistics.java |  9 +++++
 .../runtime/api/InputStatisticsReporter.java    | 10 ++++++
 .../tez/runtime/api/OutputStatistics.java       |  9 +++++
 .../runtime/api/OutputStatisticsReporter.java   |  9 +++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 17 +++++-----
 .../tez/dag/app/TestMockDAGAppMaster.java       | 12 +++++--
 .../org/apache/tez/mapreduce/input/MRInput.java |  4 +++
 .../tez/mapreduce/input/MultiMRInput.java       |  5 +++
 .../apache/tez/mapreduce/output/MROutput.java   |  4 +++
 .../tez/runtime/api/impl/IOStatistics.java      | 35 +++++++++++++++++++-
 .../tez/runtime/api/impl/TaskStatistics.java    |  4 +--
 .../runtime/api/impl/TezInputContextImpl.java   |  7 ++++
 .../runtime/api/impl/TezOutputContextImpl.java  |  7 ++++
 .../library/input/OrderedGroupedKVInput.java    |  7 ++--
 .../runtime/library/input/UnorderedKVInput.java |  7 ++--
 .../output/OrderedPartitionedKVOutput.java      |  5 ++-
 .../library/output/UnorderedKVOutput.java       |  6 ++--
 .../output/UnorderedPartitionedKVOutput.java    |  5 ++-
 .../library/output/TestOnFileSortedOutput.java  | 15 +++++++--
 .../output/TestOnFileUnorderedKVOutput.java     |  2 ++
 21 files changed, 157 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4ba2f2f..5293120 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2308. Add set/get of record counts in task/vertex statistics
   TEZ-2344. Tez UI: Equip basic-ember-table's cell level loading for all use cases in all
DAGs table
   TEZ-2313. Regression in handling obsolete events in ShuffleScheduler.
   TEZ-2212. Notify components on DAG completion.

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
index fb99f2d..1066dbb 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatistics.java
@@ -37,4 +37,13 @@ public interface InputStatistics {
    * @return Data size in bytes
    */
  public long getDataSize();
+ 
+  /**
+   * Get the numbers of items processed. These could be key-value pairs, table
+   * records etc.
+   * 
+   * @return Number of items processed
+   */
+ public long getItemsProcessed();
+ 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
index 68a56e7..a85d25b 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputStatisticsReporter.java
@@ -34,4 +34,14 @@ public interface InputStatisticsReporter {
    */
   public void reportDataSize(long size);
 
+  /**
+   * Report the number of items processed. These could be key-value pairs, table
+   * records etc.
+   * 
+   * @param items
+   *          number of items
+   */
+  public void reportItemsProcessed(long items);
+  
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
index 0373606..2f18a03 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatistics.java
@@ -38,4 +38,13 @@ public interface OutputStatistics {
    * @return Data size in bytes
    */
  public long getDataSize();
+ 
+ /**
+   * Get the numbers of items processed. These could be key-value pairs, table
+   * records etc.
+   * 
+   * @return Number of items processed
+  */
+ public long getItemsProcessed();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
index fc9f1b7..1931e5c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputStatisticsReporter.java
@@ -34,4 +34,13 @@ public interface OutputStatisticsReporter {
    */
   public void reportDataSize(long size);
 
+  /**
+   * Report the number of items processed. These could be key-value pairs, table
+   * records etc.
+   * 
+   * @param items
+   *          number of items
+   */
+  public void reportItemsProcessed(long items);
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b4b8062..5dfcb8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -788,10 +788,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       return super.getDataSize();
     }
     
-    void mergeFrom(org.apache.tez.runtime.api.impl.IOStatistics other) {
-      this.setDataSize(this.getDataSize() + other.getDataSize());
+    @Override
+    public long getItemsProcessed() {
+      return super.getItemsProcessed();
     }
-    
   }
 
   class VertexStatisticsImpl implements VertexStatistics {
@@ -813,12 +813,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         return;
       }
       
-      for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry :
taskStats.getIOStatistics().entrySet()) {
-        String edgeName = entry.getKey();
-        IOStatisticsImpl myEdgeStat = ioStats.get(edgeName);
-        Preconditions.checkState(myEdgeStat != null, "Unexpected IO name: " + edgeName
+      for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry :
taskStats
+          .getIOStatistics().entrySet()) {
+        String ioName = entry.getKey();
+        IOStatisticsImpl myIOStat = ioStats.get(ioName);
+        Preconditions.checkState(myIOStat != null, "Unexpected IO name: " + ioName
             + " for vertex:" + getLogIdentifier());
-        myEdgeStat.mergeFrom(entry.getValue());
+        myIOStat.mergeFrom(entry.getValue());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 4d01117..db1d632 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -376,6 +376,7 @@ public class TestMockDAGAppMaster {
                 OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
     IOStatistics ioStats = new IOStatistics();
     ioStats.setDataSize(1);
+    ioStats.setItemsProcessed(1);
     TaskStatistics vAStats = new TaskStatistics();
     vAStats.addIO(vBName, ioStats);
     vAStats.addIO(sourceName, ioStats);
@@ -426,10 +427,14 @@ public class TestMockDAGAppMaster {
       VertexStatistics vStats = v.getStatistics();
       if (v.getName().equals(vAName)) {
         Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getDataSize());
-        Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize());    
   
+        Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize());
+        Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getItemsProcessed());
+        Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getItemsProcessed());
       } else {
         Assert.assertEquals(2, vStats.getInputStatistics(vAName).getDataSize());
-        Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize());     
  
+        Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize());
+        Assert.assertEquals(2, vStats.getInputStatistics(vAName).getItemsProcessed());
+        Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getItemsProcessed());
       }
     }
     
@@ -521,6 +526,7 @@ public class TestMockDAGAppMaster {
 
     IOStatistics ioStats = new IOStatistics();
     ioStats.setDataSize(1);
+    ioStats.setItemsProcessed(1);
     TaskStatistics vAStats = new TaskStatistics();
 
     DAG dag = DAG.create("testBasisStatistics");
@@ -566,6 +572,8 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
     Assert.assertEquals(numTasks,
         dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getDataSize());
+    Assert.assertEquals(numTasks,
+        dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getItemsProcessed());
     checkMemory(dag.getName(), mockApp);
     tezClient.stop();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 2fb1647..991f6d1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -532,6 +532,10 @@ public class MRInput extends MRInputBase {
   @Override
   public List<Event> close() throws IOException {
     mrReader.close();
+    long inputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index 9596f07..425d737 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.mapreduce.input.base.MRInputBase;
 import org.apache.tez.mapreduce.lib.MRInputUtils;
 import org.apache.tez.mapreduce.lib.MRReader;
@@ -187,6 +188,10 @@ public class MultiMRInput extends MRInputBase {
     for (MRReader reader : readers) {
       reader.close();
     }
+    long inputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 349a894..483c92b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -507,6 +507,10 @@ public class MROutput extends AbstractLogicalOutput {
   @Override
   public synchronized List<Event> close() throws IOException {
     flush();
+    long outputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
     return null;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
index ede9205..0f8b589 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/IOStatistics.java
@@ -17,8 +17,15 @@
 
 package org.apache.tez.runtime.api.impl;
 
-public class IOStatistics {
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class IOStatistics implements Writable {
   private long dataSize = 0;
+  private long numItems = 0;
   
   public void setDataSize(long size) {
     this.dataSize = size;
@@ -27,4 +34,30 @@ public class IOStatistics {
   public long getDataSize() {
     return dataSize;
   }
+  
+  public void setItemsProcessed(long items) {
+    this.numItems = items;
+  }
+  
+  public long getItemsProcessed() {
+    return numItems;
+  }
+  
+  public void mergeFrom(org.apache.tez.runtime.api.impl.IOStatistics other) {
+    this.setDataSize(this.getDataSize() + other.getDataSize());
+    this.setItemsProcessed(this.getItemsProcessed() + other.getItemsProcessed());
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(getDataSize());
+    out.writeLong(getItemsProcessed());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    setDataSize(in.readLong());
+    setItemsProcessed(in.readLong());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
index b50d099..0b4bef8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskStatistics.java
@@ -54,7 +54,7 @@ public class TaskStatistics implements Writable {
     for (Map.Entry<String, IOStatistics> entry : ioStatistics.entrySet()) {
       IOStatistics edgeStats = entry.getValue();
       Text.writeString(out, entry.getKey());
-      out.writeLong(edgeStats.getDataSize());
+      edgeStats.write(out);
     }
   }
 
@@ -64,7 +64,7 @@ public class TaskStatistics implements Writable {
     for (int i=0; i<numEntries; ++i) {
       String edgeName = Text.readString(in);
       IOStatistics edgeStats = new IOStatistics();
-      edgeStats.setDataSize(in.readLong());
+      edgeStats.readFields(in);
       addIO(edgeName, edgeStats);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index a9b7eab..f6330f3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -66,6 +66,13 @@ public class TezInputContextImpl extends TezTaskContextImpl
       runtimeTask.getTaskStatistics().getIOStatistics().get(sourceVertexName)
           .setDataSize(size);
     }
+
+    @Override
+    public void reportItemsProcessed(long items) {
+      // this is a concurrent map. Plus we are not adding/deleting entries
+      runtimeTask.getTaskStatistics().getIOStatistics().get(sourceVertexName)
+          .setItemsProcessed(items);      
+    }
     
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 17513dd..4045113 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -62,6 +62,13 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName)
       .setDataSize(size);
     }
+
+    @Override
+    public void reportItemsProcessed(long items) {
+      // this is a concurrent map. Plus we are not adding/deleting entries
+      runtimeTask.getTaskStatistics().getIOStatistics().get(destinationVertexName)
+      .setItemsProcessed(items);;
+    }
     
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index d2d9ed8..e61dbdc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -186,9 +186,12 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
       shuffle.shutdown();
     }
     
-    long outputSize = getContext().getCounters()
+    long dataSize = getContext().getCounters()
         .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
-    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    getContext().getStatisticsReporter().reportDataSize(dataSize);
+    long inputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
     
     return Collections.emptyList();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index f2f47e1..ce27103 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -209,9 +209,12 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       this.shuffleManager.shutdown();
     }
     
-    long outputSize = getContext().getCounters()
+    long dataSize = getContext().getCounters()
         .findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
-    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    getContext().getStatisticsReporter().reportDataSize(dataSize);
+    long inputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.INPUT_RECORDS_PROCESSED).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(inputRecords);
     
     return null;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index df6daf2..40edc76 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -192,7 +192,10 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
     
     long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     getContext().getStatisticsReporter().reportDataSize(outputSize);
-    
+    long outputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
     return returnEvents;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 6c9077e..2c26374 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -44,7 +44,6 @@ import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -131,7 +130,10 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     
     long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     getContext().getStatisticsReporter().reportDataSize(outputSize);
-    
+    long outputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
     return returnEvents;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 8b83f9b..34f2e3e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -107,7 +107,10 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput
{
 
     long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
     getContext().getStatisticsReporter().reportDataSize(outputSize);
-    
+    long outputRecords = getContext().getCounters()
+        .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
     return returnEvents;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 30f78fe..19eb18a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -72,7 +72,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-
+@SuppressWarnings({ "rawtypes", "unchecked" })
 @RunWith(Parameterized.class)
 public class TestOnFileSortedOutput {
   private static final Random rnd = new Random();
@@ -90,6 +90,7 @@ public class TestOnFileSortedOutput {
   private int sorterThreads;
   
   final AtomicLong outputSize = new AtomicLong();
+  final AtomicLong numRecords = new AtomicLong();
 
   private KeyValuesWriter writer;
   private OrderedPartitionedKVOutput sortedOutput;
@@ -135,6 +136,7 @@ public class TestOnFileSortedOutput {
         sendEmptyPartitionViaEvent);
 
     outputSize.set(0);
+    numRecords.set(0);
     fs.mkdirs(workingDir);
     this.partitions = Math.max(1, rnd.nextInt(10));
   }
@@ -271,10 +273,12 @@ public class TestOnFileSortedOutput {
     startSortedOutput(partitions);
 
     //Write random set of keys
+    long recordsWritten = numRecords.get();
     for (int i = 0; i < Math.max(1, rnd.nextInt(50)); i++) {
       Text key = new Text(new BigInteger(256, rnd).toString());
       LinkedList values = new LinkedList();
       for (int j = 0; j < Math.max(2, rnd.nextInt(10)); j++) {
+        recordsWritten++;
         values.add(new Text(new BigInteger(256, rnd).toString()));
       }
       writer.write(key, values);
@@ -282,7 +286,7 @@ public class TestOnFileSortedOutput {
 
     List<Event> eventList = sortedOutput.close();
     assertTrue(eventList != null && eventList.size() == 2);
-
+    assertEquals(recordsWritten, numRecords.get());
     ShuffleUserPayloads.DataMovementEventPayloadProto
         payload = ShuffleUserPayloads.DataMovementEventPayloadProto
         .parseFrom(
@@ -358,6 +362,13 @@ public class TestOnFileSortedOutput {
         return null;
       }
     }).when(reporter).reportDataSize(anyLong());
+    doAnswer(new Answer() {
+      @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+        numRecords.set((Long) invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(reporter).reportItemsProcessed(anyLong());
+
     
     OutputContext context = mock(OutputContext.class);
     doReturn(counters).when(context).getCounters();

http://git-wip-us.apache.org/repos/asf/tez/blob/f46997a7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 41eb9a4..2b25daf 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -81,6 +81,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+@SuppressWarnings("rawtypes")
 public class TestOnFileUnorderedKVOutput {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestOnFileUnorderedKVOutput.class);
@@ -139,6 +140,7 @@ public class TestOnFileUnorderedKVOutput {
 
     events = kvOutput.close();
     assertEquals(45, stats.getIOStatistics().values().iterator().next().getDataSize());
+    assertEquals(5, stats.getIOStatistics().values().iterator().next().getItemsProcessed());
     assertTrue(events != null && events.size() == 1);
     CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
 


Mime
View raw message