tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3303. Have ShuffleVertexManager consume more precise partition stats. Contributed by Tsuyoshi Ozawa.
Date Wed, 13 Jul 2016 05:19:57 GMT
Repository: tez
Updated Branches:
  refs/heads/master 8131896b3 -> 91279010b


TEZ-3303. Have ShuffleVertexManager consume more precise partition
stats. Contributed by Tsuyoshi Ozawa.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/91279010
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/91279010
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/91279010

Branch: refs/heads/master
Commit: 91279010b6b72afa2ab9ca357dcf07356bd90ac6
Parents: 8131896
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jul 12 22:19:08 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jul 12 22:19:08 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../vertexmanager/ShuffleVertexManager.java     | 13 ++++
 .../vertexmanager/TestShuffleVertexManager.java | 65 +++++++++++++++++---
 3 files changed, 69 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/91279010/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c1f0cf..9aab833 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3303. Have ShuffleVertexManager consume more precise partition stats.
   TEZ-1248. Reduce slow-start should special case 1 reducer runs.
   TEZ-3327. ATS Parser: Populate config details available in dag.
   TEZ-3325. Flaky test in TestDAGImpl.testCounterLimits.

http://git-wip-us.apache.org/repos/asf/tez/blob/91279010/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 7d9822c..c8a1f30 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -555,6 +555,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
 
   @VisibleForTesting
+  void parseDetailedPartitionStats(List<Integer> partitionStats) {
+    Preconditions.checkState(stats != null, "Stats should be initialized");
+    for (int i = 0; i< partitionStats.size(); i++) {
+      stats[i] += partitionStats.get(i);
+    }
+  }
+
+  @VisibleForTesting
   void parsePartitionStats(RoaringBitmap partitionStats) {
     Preconditions.checkState(stats != null, "Stats should be initialized");
     Iterator<Integer> it = partitionStats.iterator();
@@ -618,10 +626,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
           partitionStats.deserialize(new DataInputStream(bin));
 
           parsePartitionStats(partitionStats);
+
         } catch (IOException e) {
           throw new TezUncheckedException(e);
         }
+      } else if (proto.hasDetailedPartitionStats()) {
+        List<Integer> detailedPartitionStats = proto.getDetailedPartitionStats().getSizeInMbList();
+        parseDetailedPartitionStats(detailedPartitionStats);
       }
+
       srcInfo.numVMEventsReceived++;
       srcInfo.outputSize += sourceTaskOutputSize;
       completedSourceTasksOutputSize += sourceTaskOutputSize;

http://git-wip-us.apache.org/repos/asf/tez/blob/91279010/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 2566c94..a5a6581 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -330,9 +330,9 @@ public class TestShuffleVertexManager {
      */
     scheduledTasks.clear();
     //{5,9,12,18} in bitmap
-    long[] sizes = new long[]{(0l), (1000l * 1000l),
-                              (1010 * 1000l * 1000l), (50 * 1000l * 1000l)};
-    vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex");
+    final long MB = 1024l * 1024l;
+    long[] sizes = new long[]{(0l), (1 * MB), (964 * MB), (48 * MB)};
+    vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", false);
 
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
     manager.onVertexStarted(emptyCompletions);
@@ -362,6 +362,37 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
     Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
 
+    // Testing for detailed partition stats
+    vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex", true);
+
+    manager = createManager(conf, mockContext, 0.01f, 0.75f);
+    manager.onVertexStarted(emptyCompletions);
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+    taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1,
taId1));
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+    Assert.assertEquals(4, manager.stats.length);
+    Assert.assertEquals(0, manager.stats[0]);
+    Assert.assertEquals(1, manager.stats[1]);
+    Assert.assertEquals(964, manager.stats[2]);
+    Assert.assertEquals(48, manager.stats[3]);
+
+    // sending again from a different version of the same task has not impact
+    taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1,
taId2));
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+    Assert.assertEquals(4, manager.stats.length);
+    Assert.assertEquals(0, manager.stats[0]);
+    Assert.assertEquals(1, manager.stats[1]);
+    Assert.assertEquals(964, manager.stats[2]);
+    Assert.assertEquals(48, manager.stats[3]);
+
     /**
      * Test for TEZ-978
      * Delay determining parallelism until enough data has been received.
@@ -1017,7 +1048,11 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
 
-  VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
+  VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
throws IOException {
+    return getVertexManagerEvent(sizes, totalSize, vertexName, false);
+  }
+
+  VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName,
boolean reportDetailedStats)
       throws IOException {
     ByteBuffer payload = null;
     if (sizes != null) {
@@ -1026,12 +1061,22 @@ public class TestShuffleVertexManager {
       partitionStats.serialize(dout);
       ByteString
           partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
-      payload =
-          VertexManagerEventPayloadProto.newBuilder()
-              .setOutputSize(totalSize)
-              .setPartitionStats(partitionStatsBytes)
-              .build().toByteString()
-              .asReadOnlyByteBuffer();
+      if (reportDetailedStats) {
+        payload =
+            VertexManagerEventPayloadProto.newBuilder()
+                .setOutputSize(totalSize)
+                .setDetailedPartitionStats(ShuffleUtils.getDetailedPartitionStatsForPhysicalOutput(sizes))
+                .build().toByteString()
+                .asReadOnlyByteBuffer();
+      } else {
+        payload =
+            VertexManagerEventPayloadProto.newBuilder()
+                .setOutputSize(totalSize)
+                .setPartitionStats(partitionStatsBytes)
+                .build().toByteString()
+                .asReadOnlyByteBuffer();
+      }
+
     } else {
       payload =
           VertexManagerEventPayloadProto.newBuilder()


Mime
View raw message