tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject tez git commit: TEZ-3739. Fair CartesianProduct doesn't works well with huge difference in output size (zhiyuany)
Date Wed, 31 May 2017 20:51:46 GMT
Repository: tez
Updated Branches:
  refs/heads/master 01db837c4 -> 7993156c0


TEZ-3739. Fair CartesianProduct doesn't works well with huge difference in output size (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7993156c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7993156c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7993156c

Branch: refs/heads/master
Commit: 7993156c0ccf209b0c371c1c7cfd805ea721c6f3
Parents: 01db837
Author: Zhiyuan Yang <zhiyuany@apache.org>
Authored: Wed May 31 13:49:40 2017 -0700
Committer: Zhiyuan Yang <zhiyuany@apache.org>
Committed: Wed May 31 13:49:40 2017 -0700

----------------------------------------------------------------------
 .../FairCartesianProductVertexManager.java      | 56 +++++++++++++++-----
 .../TestFairCartesianProductVertexManager.java  | 47 ++++++++++++++++
 2 files changed, 91 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7993156c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
index 86e2080..7b519e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java
@@ -434,22 +434,17 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     }
     LOG.info("Total ops " + totalOps + ", initial parallelism " + parallelism);
 
-    // determine num chunk for each source by weighted factorization of initial parallelism
-    // final parallelism will be product of all #chunk
-    double k = Math.log10(parallelism);
-    for (Source src : sourcesByName.values()) {
-      k -= Math.log10(src.numRecord);
+    if (enableGrouping) {
+      determineNumChunks(sourcesByName, parallelism);
+    } else {
+      for (Source src : sourcesByName.values()) {
+        src.numChunk = src.getSrcVertexWithMostOutput().numTask;
+      }
     }
-    k = Math.pow(10, k / sourcesByName.size());
 
+    // final parallelism will be product of all #chunk
     parallelism = 1;
     for (Source src : sourcesByName.values()) {
-      if (enableGrouping) {
-        src.numChunk = Math.min(src.getSrcVertexWithMostOutput().numTask * numPartitions,
-          Math.max(1, (int) (src.numRecord * k)));
-      } else {
-        src.numChunk = src.getSrcVertexWithMostOutput().numTask;
-      }
       parallelism *= src.numChunk;
     }
 
@@ -491,6 +486,43 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea
     return true;
   }
 
+  /**
+   * determine num chunk for each source by weighted factorization of initial parallelism
+   **/
+  private void determineNumChunks(Map<String, Source> sourcesByName, int parallelism)
{
+    // first round: set numChunk to 1 if source output is too small
+    double k = Math.log10(parallelism);
+    for (Source src : sourcesByName.values()) {
+      k -= Math.log10(src.numRecord);
+    }
+    k = Math.pow(10, k / sourcesByName.size());
+
+    for (Source src : sourcesByName.values()) {
+      if (src.numRecord * k < 2) {
+        src.numChunk = 1;
+      }
+    }
+
+    // second round: weighted factorization
+    k = Math.log10(parallelism);
+    int numLargeSrc = 0;
+    for (Source src : sourcesByName.values()) {
+      if (src.numChunk != 1) {
+        k -= Math.log10(src.numRecord);
+        numLargeSrc++;
+      }
+    }
+    k = Math.pow(10, k / numLargeSrc);
+
+    for (Source src : sourcesByName.values()) {
+      if (src.numChunk != 1) {
+        src.numChunk = Math.min(maxParallelism,
+          Math.min(src.getSrcVertexWithMostOutput().numTask * numPartitions,
+            Math.max(1, (int) (src.numRecord * k))));
+      }
+    }
+  }
+
   private void tryScheduleTasks() throws IOException {
     if (!vertexReconfigured && !tryReconfigure()) {
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/7993156c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
index d088fd3..6219a15 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java
@@ -497,4 +497,51 @@ public class TestFairCartesianProductVertexManager {
     verify(ctx, times(1)).reconfigureVertex(
       eq(6), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
   }
+
+  @Test(timeout = 5000)
+  public void testParallelismTwoSkewedSource() throws Exception {
+    setupDAGVertexOnly(100, 10000, 10, 10);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(15000, "v0", 0));
+
+    for (int i = 0; i < 30; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEvent(1, "v1", i));
+    }
+
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(99), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"},
+      new int[]{99, 1}, 100);
+  }
+
+  @Test(timeout = 5000)
+  public void testParallelismThreeSkewedSource() throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
+    setSrcParallelism(ctx, 10, 2, 3, 4);
+
+    CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder();
+    builder.setIsPartitioned(false).addSources("v0").addSources("v1").addSources("v2")
+      .setMaxParallelism(100).setMinOpsPerWorker(10000)
+      .setNumPartitionsForFairCase(10);
+    vertexManager.initialize(builder.build());
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
+
+    vertexManager.onVertexManagerEventReceived(getVMEvent(60000, "v0", 0));
+    vertexManager.onVertexManagerEventReceived(getVMEvent(4000, "v1", 0));
+    for (int i = 0; i < 40; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEvent(3, "v2", i));
+    }
+
+    verify(ctx, times(1)).reconfigureVertex(
+      eq(93), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1", "v2"},
+      new int[]{31, 3, 1}, 100);
+  }
 }
\ No newline at end of file


Mime
View raw message