Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA2B6200C86 for ; Wed, 31 May 2017 22:51:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E8817160BCB; Wed, 31 May 2017 20:51:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 147F6160BC2 for ; Wed, 31 May 2017 22:51:46 +0200 (CEST) Received: (qmail 37924 invoked by uid 500); 31 May 2017 20:51:46 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 37911 invoked by uid 99); 31 May 2017 20:51:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 20:51:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D905DFC2E; Wed, 31 May 2017 20:51:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhiyuany@apache.org To: commits@tez.apache.org Message-Id: <38a94a9e95bf469d95aff26a30885b6d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3739. Fair CartesianProduct doesn't works well with huge difference in output size (zhiyuany) Date: Wed, 31 May 2017 20:51:46 +0000 (UTC) archived-at: Wed, 31 May 2017 20:51:48 -0000 Repository: tez Updated Branches: refs/heads/master 01db837c4 -> 7993156c0 TEZ-3739. Fair CartesianProduct doesn't works well with huge difference in output size (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7993156c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7993156c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7993156c Branch: refs/heads/master Commit: 7993156c0ccf209b0c371c1c7cfd805ea721c6f3 Parents: 01db837 Author: Zhiyuan Yang Authored: Wed May 31 13:49:40 2017 -0700 Committer: Zhiyuan Yang Committed: Wed May 31 13:49:40 2017 -0700 ---------------------------------------------------------------------- .../FairCartesianProductVertexManager.java | 56 +++++++++++++++----- .../TestFairCartesianProductVertexManager.java | 47 ++++++++++++++++ 2 files changed, 91 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7993156c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java index 86e2080..7b519e4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java @@ -434,22 +434,17 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea } LOG.info("Total ops " + totalOps + ", initial parallelism " + parallelism); - // determine num chunk for each source by weighted factorization of initial parallelism - // final parallelism will be product of all #chunk - double k = Math.log10(parallelism); - for (Source src : sourcesByName.values()) { - k -= Math.log10(src.numRecord); + if (enableGrouping) { + determineNumChunks(sourcesByName, parallelism); + } else { + for (Source src : sourcesByName.values()) { + src.numChunk = src.getSrcVertexWithMostOutput().numTask; + } } - k = Math.pow(10, k / sourcesByName.size()); + // final parallelism will be product of all #chunk parallelism = 1; for (Source src : sourcesByName.values()) { - if (enableGrouping) { - src.numChunk = Math.min(src.getSrcVertexWithMostOutput().numTask * numPartitions, - Math.max(1, (int) (src.numRecord * k))); - } else { - src.numChunk = src.getSrcVertexWithMostOutput().numTask; - } parallelism *= src.numChunk; } @@ -491,6 +486,43 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea return true; } + /** + * determine num chunk for each source by weighted factorization of initial parallelism + **/ + private void determineNumChunks(Map sourcesByName, int parallelism) { + // first round: set numChunk to 1 if source output is too small + double k = Math.log10(parallelism); + for (Source src : sourcesByName.values()) { + k -= Math.log10(src.numRecord); + } + k = Math.pow(10, k / sourcesByName.size()); + + for (Source src : sourcesByName.values()) { + if (src.numRecord * k < 2) { + src.numChunk = 1; + } + } + + // second round: weighted factorization + k = Math.log10(parallelism); + int numLargeSrc = 0; + for (Source src : sourcesByName.values()) { + if (src.numChunk != 1) { + k -= Math.log10(src.numRecord); + numLargeSrc++; + } + } + k = Math.pow(10, k / numLargeSrc); + + for (Source src : sourcesByName.values()) { + if (src.numChunk != 1) { + src.numChunk = Math.min(maxParallelism, + Math.min(src.getSrcVertexWithMostOutput().numTask * numPartitions, + Math.max(1, (int) (src.numRecord * k)))); + } + } + } + private void tryScheduleTasks() throws IOException { if (!vertexReconfigured && !tryReconfigure()) { return; http://git-wip-us.apache.org/repos/asf/tez/blob/7993156c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java index d088fd3..6219a15 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java @@ -497,4 +497,51 @@ public class TestFairCartesianProductVertexManager { verify(ctx, times(1)).reconfigureVertex( eq(6), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); } + + @Test(timeout = 5000) + public void testParallelismTwoSkewedSource() throws Exception { + setupDAGVertexOnly(100, 10000, 10, 10); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + + vertexManager.onVertexManagerEventReceived(getVMEvent(15000, "v0", 0)); + + for (int i = 0; i < 30; i++) { + vertexManager.onVertexManagerEventReceived(getVMEvent(1, "v1", i)); + } + + verify(ctx, times(1)).reconfigureVertex( + eq(99), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, + new int[]{99, 1}, 100); + } + + @Test(timeout = 5000) + public void testParallelismThreeSkewedSource() throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3)); + setSrcParallelism(ctx, 10, 2, 3, 4); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1").addSources("v2") + .setMaxParallelism(100).setMinOpsPerWorker(10000) + .setNumPartitionsForFairCase(10); + vertexManager.initialize(builder.build()); + + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED)); + + vertexManager.onVertexManagerEventReceived(getVMEvent(60000, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(4000, "v1", 0)); + for (int i = 0; i < 40; i++) { + vertexManager.onVertexManagerEventReceived(getVMEvent(3, "v2", i)); + } + + verify(ctx, times(1)).reconfigureVertex( + eq(93), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1", "v2"}, + new int[]{31, 3, 1}, 100); + } } \ No newline at end of file