tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject tez git commit: TEZ-3737. FairCartesianProductVertexMananger used incorrect #partition (zhiyuany)
Date Wed, 24 May 2017 19:34:17 GMT
Repository: tez
Updated Branches:
  refs/heads/master 51972efec -> 788c1ad7f


TEZ-3737. FairCartesianProductVertexMananger used incorrect #partition (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/788c1ad7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/788c1ad7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/788c1ad7

Branch: refs/heads/master
Commit: 788c1ad7f6e7cac72c62f42f1fdabcbc9b97892e
Parents: 51972ef
Author: Zhiyuan Yang <zhiyuany@apache.org>
Authored: Wed May 24 12:33:03 2017 -0700
Committer: Zhiyuan Yang <zhiyuany@apache.org>
Committed: Wed May 24 12:33:03 2017 -0700

----------------------------------------------------------------------
 .../FairCartesianProductVertexManager.java      | 50 ++++++++++----------
 .../TestFairCartesianProductVertexManager.java  | 12 ++---
 2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/788c1ad7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
index a38e20d..86e2080 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
@@ -89,7 +89,8 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     // or estimated total number of output record (after reconfiguration)
     long numRecord;
 
-    public String toString(boolean afterReconfigure) {
+    @Override
+    public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Source at position ");
       sb.append(position);
@@ -99,11 +100,11 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
         sb.append(name);
 
       }
-      sb.append("num chunk ").append(numChunk);
+      sb.append(", num chunk ").append(numChunk);
       sb.append(": {");
       for (SrcVertex srcV : srcVertices) {
         sb.append("[");
-        sb.append(srcV.toString(afterReconfigure));
+        sb.append(srcV.toString());
         sb.append("], ");
       }
       sb.deleteCharAt(sb.length() - 1);
@@ -165,17 +166,13 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     // or estimated total number of output record (after reconfiguration)
     long numRecord;
 
-    public String toString(boolean afterReconfigure) {
+    public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("vertex ").append(name).append(", ");
-      if (afterReconfigure) {
-        sb.append("estimated # output records ").append(numRecord).append(", ");
-        sb.append("# chunks ").append(source.numChunk);
-      } else {
-        sb.append(numTask).append(" tasks, ");
-        sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
-        sb.append("numRecord ").append(numRecord);
-      }
+      sb.append(numTask).append(" tasks, ");
+      sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, ");
+      sb.append("numRecord ").append(numRecord).append(", ");
+      sb.append("estimated # output records ").append(estimateNumRecord());
       return sb.toString();
     }
 
@@ -189,8 +186,8 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
 
     public boolean isChunkCompleted(int chunkId) {
       grouper.init(numTask * numPartitions, source.numChunk);
-      int firstRelevantTask = grouper.getFirstItemInGroup(chunkId) / maxParallelism;
-      int lastRelevantTask = grouper.getLastItemInGroup(chunkId) / maxParallelism;
+      int firstRelevantTask = grouper.getFirstItemInGroup(chunkId) / numPartitions;
+      int lastRelevantTask = grouper.getLastItemInGroup(chunkId) / numPartitions;
       for (int relevantTask = firstRelevantTask; relevantTask <= lastRelevantTask; relevantTask++)
{
         if (!taskCompleted.contains(relevantTask)) {
           return false;
@@ -274,6 +271,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
           srcVerticesByName.get(srcVName).source = source;
         }
       } else {
+        source.name = srcName;
         source.srcVertices.add(srcVerticesByName.get(srcName));
         srcVerticesByName.get(srcName).source = source;
       }
@@ -292,6 +290,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     throws Exception {
     vertexStarted = true;
     if (completions != null) {
+      LOG.info("OnVertexStarted with " + completions.size() + " completed source task");
       for (TaskAttemptIdentifier attempt : completions) {
         addCompletedSrcTaskToProcess(attempt);
       }
@@ -387,7 +386,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
       for (SrcVertex srcV : srcVerticesByName.values()) {
         if (srcV.taskCompleted.getCardinality() < srcV.numTask
           && (srcV.numTask * config.getGroupingFraction() > srcV.taskCompleted.getCardinality()
-            || srcV.numRecord == 0)) {
+          || srcV.numRecord == 0)) {
           return false;
         }
       }
@@ -402,17 +401,19 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
       }
     }
 
-    LOG.info("Start reconfigure, "
+    LOG.info("Start reconfiguring vertex " + getContext().getVertexName()
       + ", max parallelism: " + maxParallelism
-      + ", min-ops-per-worker: " + minOpsPerWorker);
+      + ", min-ops-per-worker: " + minOpsPerWorker
+      + ", num partition: " + numPartitions);
     for (Source src : sourcesByName.values()) {
-      LOG.info(src.toString(false));
+      LOG.info(src.toString());
     }
 
     long totalOps = 1;
     for (Source src : sourcesByName.values()) {
       src.numRecord = src.estimateNumRecord();
       if (src.numRecord == 0) {
+        LOG.info("Set parallelism to 0 because source " + src.name + " has 0 output recorc");
         reconfigureWithZeroTask();
         return true;
       }
@@ -420,6 +421,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
       try {
         totalOps  = LongMath.checkedMultiply(totalOps, src.numRecord);
       } catch (ArithmeticException e) {
+        LOG.info("totalOps exceeds " + Long.MAX_VALUE + ", capping to " + Long.MAX_VALUE);
         totalOps = Long.MAX_VALUE;
       }
     }
@@ -430,6 +432,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     } else {
       parallelism = (int) ((totalOps + minOpsPerWorker - 1) / minOpsPerWorker);
     }
+    LOG.info("Total ops " + totalOps + ", initial parallelism " + parallelism);
 
     // determine num chunk for each source by weighted factorization of initial parallelism
     // final parallelism will be product of all #chunk
@@ -450,11 +453,10 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
       parallelism *= src.numChunk;
     }
 
-    LOG.info("After reconfigure, ");
+    LOG.info("After reconfigure, final parallelism " + parallelism);
     for (Source src : sourcesByName.values()) {
-      LOG.info(src.toString(false));
+      LOG.info(src.toString());
     }
-    LOG.info("Final parallelism: " + parallelism);
 
     for (int i = 0; i < numChunksPerSrc.length; i++) {
       numChunksPerSrc[i] = sourcesByName.get(sourceList.get(i)).numChunk;
@@ -516,9 +518,9 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     CartesianProductCombination combination =
       new CartesianProductCombination(numChunksPerSrc, src.position);
 
-    grouper.init(srcV.numTask * maxParallelism, src.numChunk);
-    int firstRelevantChunk = grouper.getGroupId(taskId * maxParallelism);
-    int lastRelevantChunk = grouper.getGroupId(taskId * maxParallelism + maxParallelism -
1);
+    grouper.init(srcV.numTask * numPartitions, src.numChunk);
+    int firstRelevantChunk = grouper.getGroupId(taskId * numPartitions);
+    int lastRelevantChunk = grouper.getGroupId(taskId * numPartitions + numPartitions - 1);
     for (int chunkId = firstRelevantChunk; chunkId <= lastRelevantChunk; chunkId++) {
       combination.firstTaskWithFixedChunk(chunkId);
       do {

http://git-wip-us.apache.org/repos/asf/tez/blob/788c1ad7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
index 01d7f0b..d088fd3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
@@ -80,7 +80,7 @@ public class TestFairCartesianProductVertexManager {
   /**
    * v0 and v1 are two cartesian product sources
    */
-  private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker,
+  private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker, int numPartition,
                                   int srcParallelismMultiplier) throws Exception {
     when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
     setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3);
@@ -88,7 +88,7 @@ public class TestFairCartesianProductVertexManager {
     CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
     builder.setIsPartitioned(false).addSources("v0").addSources("v1")
       .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minOpsPerWorker)
-      .setNumPartitionsForFairCase(maxParallelism);
+      .setNumPartitionsForFairCase(numPartition);
     vertexManager.initialize(builder.build());
   }
 
@@ -218,7 +218,7 @@ public class TestFairCartesianProductVertexManager {
 
   @Test(timeout = 5000)
   public void testDAGVertexOnlyGroupByMaxParallelism() throws Exception {
-    setupDAGVertexOnly(30, 1, 1);
+    setupDAGVertexOnly(30, 1, 30, 1);
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
 
@@ -246,7 +246,7 @@ public class TestFairCartesianProductVertexManager {
 
   @Test(timeout = 5000)
   public void testDAGVertexOnlyGroupByMinOpsPerWorker() throws Exception {
-    setupDAGVertexOnly(100, 10000, 10);
+    setupDAGVertexOnly(100, 10000, 10, 10);
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
 
@@ -366,7 +366,7 @@ public class TestFairCartesianProductVertexManager {
 
   @Test(timeout = 5000)
   public void testOnVertexStart() throws Exception {
-    setupDAGVertexOnly(6, 1, 1);
+    setupDAGVertexOnly(6, 1, 6, 1);
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
     vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v0", 0));
@@ -467,7 +467,7 @@ public class TestFairCartesianProductVertexManager {
 
   @Test(timeout = 5000)
   public void testZeroSrcOutput() throws Exception {
-    setupDAGVertexOnly(10, 1, 1);
+    setupDAGVertexOnly(10, 1, 10, 1);
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
     vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v0", 0));


Mime
View raw message