tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject tez git commit: TEZ-3697. Adding #output_record in vertex manager event payload (zhiyuany)
Date Mon, 08 May 2017 07:19:32 GMT
Repository: tez
Updated Branches:
  refs/heads/master 4ed4a5693 -> d9f542f4c


TEZ-3697. Adding #output_record in vertex manager event payload (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d9f542f4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d9f542f4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d9f542f4

Branch: refs/heads/master
Commit: d9f542f4ca2168ac7485b1b6d816a9458465e66d
Parents: 4ed4a56
Author: Zhiyuan Yang <zhiyuany@apache.org>
Authored: Mon May 8 00:18:14 2017 -0700
Committer: Zhiyuan Yang <zhiyuany@apache.org>
Committed: Mon May 8 00:18:14 2017 -0700

----------------------------------------------------------------------
 .../library/common/shuffle/ShuffleUtils.java    |  2 ++
 .../src/main/proto/ShufflePayloads.proto        |  1 +
 .../TestUnorderedPartitionedKVWriter.java       | 20 ++++++++++++++++++--
 3 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d9f542f4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index caddbc8..efcba70 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -461,6 +461,8 @@ public class ShuffleUtils {
     // multiple events would end up adding up to final output size.
     // This is needed for auto-reduce parallelism to work properly.
     vmBuilder.setOutputSize(outputSize);
+    vmBuilder.setNumRecord(context.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue()
+     + context.getCounters().findCounter(TaskCounter.OUTPUT_LARGE_RECORDS).getValue());
 
     //set partition stats
     if (sizePerPartition != null && sizePerPartition.length > 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d9f542f4/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index f78cbac..0a4f4a6 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -51,6 +51,7 @@ message VertexManagerEventPayloadProto {
   optional int64 output_size = 1;
   optional bytes partition_stats = 2;
   optional DetailedPartitionStatsProto detailed_partition_stats = 3;
+  optional int64 num_record = 4;
 }
 
 message ShuffleEdgeManagerConfigPayloadProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/d9f542f4/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index d970b95..5cda126 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.writers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -51,6 +52,7 @@ import com.google.protobuf.ByteString;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -399,6 +401,20 @@ public class TestUnorderedPartitionedKVWriter {
     List<Event> events = kvWriter.close();
     verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class),
any(String.class));
 
+    if (!pipeliningEnabled) {
+      VertexManagerEvent vmEvent = null;
+      for (Event event : events) {
+        if (event instanceof VertexManagerEvent) {
+          assertNull(vmEvent);
+          vmEvent = (VertexManagerEvent) event;
+        }
+      }
+      VertexManagerEventPayloadProto vmEventPayload =
+        VertexManagerEventPayloadProto.parseFrom(
+          ByteString.copyFrom(vmEvent.getUserPayload().asReadOnlyBuffer()));
+      assertEquals(numRecordsWritten, vmEventPayload.getNumRecord());
+    }
+
     TezCounter outputLargeRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS);
     assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
         outputLargeRecordsCounter.getValue());
@@ -481,8 +497,8 @@ public class TestUnorderedPartitionedKVWriter {
 
   private int[] getPartitionStats(VertexManagerEvent vme) throws IOException {
     RoaringBitmap partitionStats = new RoaringBitmap();
-    ShuffleUserPayloads.VertexManagerEventPayloadProto
-        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto
+    VertexManagerEventPayloadProto
+        payload = VertexManagerEventPayloadProto
         .parseFrom(ByteString.copyFrom(vme.getUserPayload()));
     if (!reportPartitionStats.isEnabled()) {
       assertFalse(payload.hasPartitionStats());


Mime
View raw message