Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C8B2217BE1 for ; Mon, 23 Mar 2015 08:09:21 +0000 (UTC) Received: (qmail 70435 invoked by uid 500); 23 Mar 2015 08:09:21 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 70352 invoked by uid 500); 23 Mar 2015 08:09:21 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 70195 invoked by uid 99); 23 Mar 2015 08:09:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Mar 2015 08:09:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4FE3DE111A; Mon, 23 Mar 2015 08:09:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Date: Mon, 23 Mar 2015 08:09:25 -0000 Message-Id: <8afbcf518b4d4b32bc6cafb42e5f1ac5@git.apache.org> In-Reply-To: <2a1297d2e42c4823a3641bc9e0807086@git.apache.org> References: <2a1297d2e42c4823a3641bc9e0807086@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/9] flink git commit: [FLINK-1679] use a consistent name for parallelism http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java index 916aa27..2df08a0 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java @@ -79,7 +79,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSet source = env.generateSequence(1, 10000); @@ -120,7 +120,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchingWithMultipleDataSinks2() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSet source = env.generateSequence(1, 10000); @@ -184,7 +184,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchingSourceMultipleTimes() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSet> source = env.generateSequence(1, 10000000) .map(new Duplicator()); @@ -267,7 +267,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchingWithMultipleDataSinks() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSet> sourceA = env.generateSequence(1, 10000000) .map(new Duplicator()); @@ -815,7 +815,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testIterationWithStaticInput() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); + env.setParallelism(100); DataSet source = env.generateSequence(1, 1000000); @@ -842,7 +842,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testBranchingBroadcastVariable() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); + env.setParallelism(100); DataSet input1 = env.readTextFile(IN_FILE).name("source1"); DataSet input2 = env.readTextFile(IN_FILE).name("source2"); @@ -914,7 +914,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testMultipleIterations() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); + env.setParallelism(100); DataSet input = env.readTextFile(IN_FILE).name("source1"); @@ -943,7 +943,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { @Test public void testMultipleIterationsWithClosueBCVars() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); + env.setParallelism(100); DataSet input = env.readTextFile(IN_FILE).name("source1"); @@ -970,7 +970,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchesOnlyInBCVariables1() { try{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); + env.setParallelism(100); DataSet input = env.generateSequence(1, 10); DataSet bc_input = env.generateSequence(1, 10); @@ -993,7 +993,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { public void testBranchesOnlyInBCVariables2() { try{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); + env.setParallelism(100); DataSet> input = env.generateSequence(1, 10).map(new Duplicator()).name("proper input"); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java index 3e7da6c..47efeb1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java @@ -203,7 +203,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { private Plan getTestPlanRightStatic(String strategy) { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSet> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile"); @@ -231,7 +231,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { private Plan getTestPlanLeftStatic(String strategy) { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); @SuppressWarnings("unchecked") DataSet> bigInput = env.fromElements(new Tuple3(1L, 2L, 3L), http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java index 565d992..4eed236 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java @@ -71,10 +71,10 @@ public abstract class CompilerTestBase implements java.io.Serializable { public void setup() { this.dataStats = new DataStatistics(); this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator()); - this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); + this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM); this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator()); - this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); + this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java deleted file mode 100644 index b17e777..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.optimizer; - -import org.junit.Assert; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.types.IntValue; -import org.apache.flink.util.Visitor; -import org.junit.Test; - -/** - * Tests in this class: - *
    - *
  • Tests that check the correct handling of the properties and strategies in the case where the degree of - * parallelism between tasks is increased or decreased. - *
- */ -@SuppressWarnings({"serial", "deprecation"}) -public class DOPChangeTest extends CompilerTestBase { - - /** - * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). - * - * Increases DOP between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable. - * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network - * transit as well. - */ - @Test - public void checkPropertyHandlingWithIncreasingGlobalParallelism1() { - final int degOfPar = DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar * 2); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); - ShipStrategyType redIn = red2Node.getInput().getShipStrategy(); - - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn); - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn); - } - - /** - * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). - * - * Increases DOP between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable. - * Expected to re-establish partitioning between map and reduce (hash). - */ - @Test - public void checkPropertyHandlingWithIncreasingGlobalParallelism2() { - final int degOfPar = DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); - ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); - - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn); - Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn); - } - - /** - * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). - * - * Increases DOP between 1st reduce and 2nd map, such that more tasks are on one instance. - * Expected to re-establish partitioning between map and reduce via a local hash. - */ - @Test - public void checkPropertyHandlingWithIncreasingLocalParallelism() { - final int degOfPar = 2 * DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar * 2); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar * 2); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar * 2); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); - ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); - - Assert.assertTrue("Invalid ship strategy for an operator.", - (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || - (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn)); - } - - - - @Test - public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() { - final int degOfPar = DEFAULT_PARALLELISM; - - // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - source.setDegreeOfParallelism(degOfPar * 2); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); - map1.setDegreeOfParallelism(degOfPar * 2); - map1.setInput(source); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); - reduce1.setDegreeOfParallelism(degOfPar * 2); - reduce1.setInput(map1); - - MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); - map2.setDegreeOfParallelism(degOfPar); - map2.setInput(reduce1); - - ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); - reduce2.setDegreeOfParallelism(degOfPar); - reduce2.setInput(map2); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setDegreeOfParallelism(degOfPar); - sink.setInput(reduce2); - - Plan plan = new Plan(sink, "Test Increasing Degree Of Parallelism"); - - // submit the plan to the compiler - OptimizedPlan oPlan = compileNoStats(plan); - - // check the optimized Plan - // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, - // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same - // mapper respectively reducer - SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); - SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); - SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); - - Assert.assertTrue("The no sorting local strategy.", - LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() || - LocalStrategy.SORT == map2Node.getInput().getLocalStrategy()); - - Assert.assertTrue("The no partitioning ship strategy.", - ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() || - ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy()); - } - - /** - * Checks that re-partitioning happens when the inputs of a two-input contract have different DOPs. - * - * Test Plan: - *
-	 * 
-	 * (source) -> reduce -\
-	 *                      Match -> (sink)
-	 * (source) -> reduce -/
-	 * 
-	 * 
- * - */ - @Test - public void checkPropertyHandlingWithTwoInputs() { - // construct the plan - - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); - - ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceA) - .build(); - ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceB) - .build(); - - JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(redA) - .input2(redB) - .build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat); - - sourceA.setDegreeOfParallelism(5); - sourceB.setDegreeOfParallelism(7); - redA.setDegreeOfParallelism(5); - redB.setDegreeOfParallelism(7); - - mat.setDegreeOfParallelism(5); - - sink.setDegreeOfParallelism(5); - - - // return the PACT plan - Plan plan = new Plan(sink, "Partition on DoP Change"); - - OptimizedPlan oPlan = compileNoStats(plan); - - JobGraphGenerator jobGen = new JobGraphGenerator(); - - //Compile plan to verify that no error is thrown - jobGen.compileJobGraph(oPlan); - - oPlan.accept(new Visitor() { - - @Override - public boolean preVisit(PlanNode visitable) { - if (visitable instanceof DualInputPlanNode) { - DualInputPlanNode node = (DualInputPlanNode) visitable; - Channel c1 = node.getInput1(); - Channel c2 = node.getInput2(); - - Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy()); - Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy()); - return false; - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) { - // DO NOTHING - } - }); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java index 34aa9f8..3b7eae7 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java @@ -42,7 +42,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io public void testDistinctPlain() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -77,7 +77,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io assertEquals(new FieldList(0, 1), combineNode.getKeys(0)); assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, combineNode.getParallelism()); assertEquals(8, reduceNode.getParallelism()); @@ -94,7 +94,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io public void testDistinctWithSelectorFunctionKey() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -135,7 +135,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io assertEquals(new FieldList(0), combineNode.getKeys(0)); assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, keyExtractor.getParallelism()); assertEquals(6, combineNode.getParallelism()); @@ -155,7 +155,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io public void testDistinctWithFieldPositionKeyCombinable() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -191,7 +191,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io assertEquals(new FieldList(1), combineNode.getKeys(0)); assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, combineNode.getParallelism()); assertEquals(8, reduceNode.getParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java index ac4f820..810ec0e 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java @@ -91,7 +91,7 @@ public class IterationsCompilerTest extends CompilerTestBase { public void testTwoIterationsWithMapperInbetween() throws Exception { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> verticesWithInitialId = env.fromElements(new Tuple2(1L, 2L)); @@ -129,7 +129,7 @@ public class IterationsCompilerTest extends CompilerTestBase { public void testTwoIterationsDirectlyChained() throws Exception { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> verticesWithInitialId = env.fromElements(new Tuple2(1L, 2L)); @@ -165,7 +165,7 @@ public class IterationsCompilerTest extends CompilerTestBase { public void testTwoWorksetIterationsDirectlyChained() throws Exception { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> verticesWithInitialId = env.fromElements(new Tuple2(1L, 2L)); @@ -201,7 +201,7 @@ public class IterationsCompilerTest extends CompilerTestBase { public void testIterationPushingWorkOut() throws Exception { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); @@ -235,7 +235,7 @@ public class IterationsCompilerTest extends CompilerTestBase { public void testWorksetIterationPipelineBreakerPlacement() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); // the workset (input two of the delta iteration) is the same as what is consumed be the successive join DataSet> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java new file mode 100644 index 0000000..a54136a --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer; + +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.DummyInputFormat; +import org.apache.flink.optimizer.util.DummyMatchStub; +import org.apache.flink.optimizer.util.DummyOutputFormat; +import org.apache.flink.optimizer.util.IdentityMap; +import org.apache.flink.optimizer.util.IdentityReduce; +import org.junit.Assert; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.record.operators.FileDataSink; +import org.apache.flink.api.java.record.operators.FileDataSource; +import org.apache.flink.api.java.record.operators.JoinOperator; +import org.apache.flink.api.java.record.operators.MapOperator; +import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.apache.flink.types.IntValue; +import org.apache.flink.util.Visitor; +import org.junit.Test; + +/** + * Tests in this class: + *
    + *
  • Tests that check the correct handling of the properties and strategies in the case where the + * parallelism between tasks is increased or decreased. + *
+ */ +@SuppressWarnings({"serial", "deprecation"}) +public class ParallelismChangeTest extends CompilerTestBase { + + /** + * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). + * + * Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st reduce is not reusable. + * Expected to re-establish partitioning between reduce and map, via hash, because random is a full network + * transit as well. + */ + @Test + public void checkPropertyHandlingWithIncreasingGlobalParallelism1() { + final int degOfPar = DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setParallelism(degOfPar); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setParallelism(degOfPar); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setParallelism(degOfPar); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setParallelism(degOfPar * 2); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setParallelism(degOfPar * 2); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setParallelism(degOfPar * 2); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); + ShipStrategyType redIn = red2Node.getInput().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn); + } + + /** + * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). + * + * Increases parallelism between 2nd map and 2nd reduce, so the hash partitioning from 1st reduce is not reusable. + * Expected to re-establish partitioning between map and reduce (hash). + */ + @Test + public void checkPropertyHandlingWithIncreasingGlobalParallelism2() { + final int degOfPar = DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setParallelism(degOfPar); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setParallelism(degOfPar); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setParallelism(degOfPar); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setParallelism(degOfPar); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setParallelism(degOfPar * 2); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setParallelism(degOfPar * 2); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); + ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); + + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn); + Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn); + } + + /** + * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all properties). + * + * Increases parallelism between 1st reduce and 2nd map, such that more tasks are on one instance. + * Expected to re-establish partitioning between map and reduce via a local hash. + */ + @Test + public void checkPropertyHandlingWithIncreasingLocalParallelism() { + final int degOfPar = 2 * DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setParallelism(degOfPar); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setParallelism(degOfPar); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setParallelism(degOfPar); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setParallelism(degOfPar * 2); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setParallelism(degOfPar * 2); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setParallelism(degOfPar * 2); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + ShipStrategyType mapIn = map2Node.getInput().getShipStrategy(); + ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy(); + + Assert.assertTrue("Invalid ship strategy for an operator.", + (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || + (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn)); + } + + + + @Test + public void checkPropertyHandlingWithDecreasingParallelism() { + final int degOfPar = DEFAULT_PARALLELISM; + + // construct the plan + FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); + source.setParallelism(degOfPar * 2); + + MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build(); + map1.setParallelism(degOfPar * 2); + map1.setInput(source); + + ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build(); + reduce1.setParallelism(degOfPar * 2); + reduce1.setInput(map1); + + MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build(); + map2.setParallelism(degOfPar); + map2.setInput(reduce1); + + ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build(); + reduce2.setParallelism(degOfPar); + reduce2.setInput(map2); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); + sink.setParallelism(degOfPar); + sink.setInput(reduce2); + + Plan plan = new Plan(sink, "Test Increasing parallelism"); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + // when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method, + // because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same + // mapper respectively reducer + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor(); + SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor(); + + Assert.assertTrue("The no sorting local strategy.", + LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() || + LocalStrategy.SORT == map2Node.getInput().getLocalStrategy()); + + Assert.assertTrue("The no partitioning ship strategy.", + ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() || + ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy()); + } + + /** + * Checks that re-partitioning happens when the inputs of a two-input contract have different parallelisms. + * + * Test Plan: + *
+	 * 
+	 * (source) -> reduce -\
+	 *                      Match -> (sink)
+	 * (source) -> reduce -/
+	 * 
+	 * 
+ * + */ + @Test + public void checkPropertyHandlingWithTwoInputs() { + // construct the plan + + FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); + FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); + + ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) + .input(sourceA) + .build(); + ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) + .input(sourceB) + .build(); + + JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) + .input1(redA) + .input2(redB) + .build(); + + FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat); + + sourceA.setParallelism(5); + sourceB.setParallelism(7); + redA.setParallelism(5); + redB.setParallelism(7); + + mat.setParallelism(5); + + sink.setParallelism(5); + + + // return the PACT plan + Plan plan = new Plan(sink, "Partition on DoP Change"); + + OptimizedPlan oPlan = compileNoStats(plan); + + JobGraphGenerator jobGen = new JobGraphGenerator(); + + //Compile plan to verify that no error is thrown + jobGen.compileJobGraph(oPlan); + + oPlan.accept(new Visitor() { + + @Override + public boolean preVisit(PlanNode visitable) { + if (visitable instanceof DualInputPlanNode) { + DualInputPlanNode node = (DualInputPlanNode) visitable; + Channel c1 = node.getInput1(); + Channel c2 = node.getInput2(); + + Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, c1.getShipStrategy()); + Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, c2.getShipStrategy()); + return false; + } + return true; + } + + @Override + public void postVisit(PlanNode visitable) { + // DO NOTHING + } + }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java index 86f01b0..31f71d1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java @@ -41,7 +41,7 @@ public class PipelineBreakerTest extends CompilerTestBase { public void testPipelineBreakerWithBroadcastVariable() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet source = env.generateSequence(1, 10).map(new IdentityMapper()); @@ -69,7 +69,7 @@ public class PipelineBreakerTest extends CompilerTestBase { public void testPipelineBreakerBroadcastedAllReduce() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper()); @@ -103,7 +103,7 @@ public class PipelineBreakerTest extends CompilerTestBase { public void testPipelineBreakerBroadcastedPartialSolution() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet initialSource = env.generateSequence(1, 10); @@ -144,7 +144,7 @@ public class PipelineBreakerTest extends CompilerTestBase { try { { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet initialSource = env.generateSequence(1, 10); @@ -166,7 +166,7 @@ public class PipelineBreakerTest extends CompilerTestBase { { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet initialSource = env.generateSequence(1, 10); @@ -189,7 +189,7 @@ public class PipelineBreakerTest extends CompilerTestBase { { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet initialSource = env.generateSequence(1, 10); @@ -212,7 +212,7 @@ public class PipelineBreakerTest extends CompilerTestBase { { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(64); + env.setParallelism(64); DataSet initialSource = env.generateSequence(1, 10); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java index 7be2b16..3cf081f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java @@ -55,7 +55,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource1() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -88,7 +88,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource2() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -121,7 +121,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource3() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -153,7 +153,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource4() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -185,7 +185,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource5() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -217,7 +217,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource6() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -249,7 +249,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedSource7() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -283,7 +283,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource1() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -317,7 +317,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource2() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -352,7 +352,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource3() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -386,7 +386,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource4() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -420,7 +420,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource5() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -454,7 +454,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource6() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -488,7 +488,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource7() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -521,7 +521,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedGroupedSource8() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -555,7 +555,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource1() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -589,7 +589,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource2() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -624,7 +624,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource3() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -658,7 +658,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource4() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -692,7 +692,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource5() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -726,7 +726,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource6() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -759,7 +759,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkSinglePartitionedOrderedSource7() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data = env.fromCollection(tuple3PojoData, tuple3PojoType); @@ -793,7 +793,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkCoPartitionedSources1() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data1 = env.readCsvFile("/some/path").types(Long.class, String.class); @@ -841,7 +841,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { public void checkCoPartitionedSources2() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); DataSource> data1 = env.readCsvFile("/some/path").types(Long.class, String.class); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java index da44b59..fd451f7 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java @@ -53,7 +53,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkJoinWithReplicatedSourceInput() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -89,7 +89,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkJoinWithReplicatedSourceInputBehindMap() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -126,7 +126,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkJoinWithReplicatedSourceInputBehindFilter() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -163,7 +163,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkJoinWithReplicatedSourceInputBehindFlatMap() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -200,7 +200,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkJoinWithReplicatedSourceInputBehindMapPartition() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -237,7 +237,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkJoinWithReplicatedSourceInputBehindMultiMaps() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -277,7 +277,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkCrossWithReplicatedSourceInput() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -313,7 +313,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { public void checkCrossWithReplicatedSourceInputBehindMap() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -345,13 +345,13 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { } /** - * Tests compiler fail for join program with replicated data source and changing DOP. + * Tests compiler fail for join program with replicated data source and changing parallelism. */ @Test(expected = CompilerException.class) - public void checkJoinWithReplicatedSourceInputChangingDOP() { + public void checkJoinWithReplicatedSourceInputChangingparallelism() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -370,13 +370,13 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { } /** - * Tests compiler fail for join program with replicated data source behind map and changing DOP. + * Tests compiler fail for join program with replicated data source behind map and changing parallelism. */ @Test(expected = CompilerException.class) - public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() { + public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -402,7 +402,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { @Test(expected = CompilerException.class) public void checkJoinWithReplicatedSourceInputBehindReduce() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); @@ -427,7 +427,7 @@ public class ReplicatingDataSourceTest extends CompilerTestBase { @Test(expected = CompilerException.class) public void checkJoinWithReplicatedSourceInputBehindRebalance() { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); ReplicatingInputFormat, FileInputSplit> rif = new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), String.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java index d397ea2..f865a9f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java @@ -45,7 +45,7 @@ public class CustomPartitioningTest extends CompilerTestBase { final int parallelism = 4; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); + env.setParallelism(parallelism); DataSet> data = env.fromElements(new Tuple2(0, 0)) .rebalance(); @@ -88,7 +88,7 @@ public class CustomPartitioningTest extends CompilerTestBase { final int parallelism = 4; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); + env.setParallelism(parallelism); DataSet> data = env.fromElements(new Tuple2(0, 0)) .rebalance(); @@ -115,7 +115,7 @@ public class CustomPartitioningTest extends CompilerTestBase { final int parallelism = 4; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); + env.setParallelism(parallelism); DataSet data = env.fromElements(new Pojo()) .rebalance(); @@ -158,7 +158,7 @@ public class CustomPartitioningTest extends CompilerTestBase { final int parallelism = 4; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); + env.setParallelism(parallelism); DataSet data = env.fromElements(new Pojo()) .rebalance(); @@ -185,7 +185,7 @@ public class CustomPartitioningTest extends CompilerTestBase { final int parallelism = 4; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); + env.setParallelism(parallelism); DataSet data = env.fromElements(new Pojo()) .rebalance(); @@ -237,7 +237,7 @@ public class CustomPartitioningTest extends CompilerTestBase { final int parallelism = 4; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(parallelism); + env.setParallelism(parallelism); DataSet data = env.fromElements(new Pojo()) .rebalance(); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java index de02836..2f9b32f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java @@ -39,7 +39,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase { public void testDistinctPreservesPartitioningOfDistinctFields() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(4); + env.setParallelism(4); @SuppressWarnings("unchecked") DataSet> data = env.fromElements(new Tuple2(0L, 0L), new Tuple2(1L, 1L)) @@ -75,7 +75,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase { public void testDistinctDestroysPartitioningOfNonDistinctFields() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(4); + env.setParallelism(4); @SuppressWarnings("unchecked") DataSet> data = env.fromElements(new Tuple2(0L, 0L), new Tuple2(1L, 1L)) http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java index a683968..c0e2fa7 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java @@ -44,7 +44,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public void testAllGroupReduceNoCombiner() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); @@ -59,7 +59,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); - // the all-reduce has no combiner, when the DOP of the input is one + // the all-reduce has no combiner, when the parallelism of the input is one SourcePlanNode sourceNode = resolver.getNode("source"); SingleInputPlanNode reduceNode = resolver.getNode("reducer"); @@ -72,7 +72,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java // check that reduce has the right strategy assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy()); - // check DOP + // check parallelism assertEquals(1, sourceNode.getParallelism()); assertEquals(1, reduceNode.getParallelism()); assertEquals(1, sinkNode.getParallelism()); @@ -88,7 +88,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public void testAllReduceWithCombiner() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet data = env.generateSequence(1, 8000000).name("source"); @@ -120,7 +120,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy()); assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy()); - // check DOP + // check parallelism assertEquals(8, sourceNode.getParallelism()); assertEquals(8, combineNode.getParallelism()); assertEquals(1, reduceNode.getParallelism()); @@ -138,7 +138,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public void testGroupedReduceWithFieldPositionKeyNonCombinable() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -171,7 +171,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(1), reduceNode.getKeys(0)); assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(8, reduceNode.getParallelism()); assertEquals(8, sinkNode.getParallelism()); @@ -187,7 +187,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public void testGroupedReduceWithFieldPositionKeyCombinable() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -228,7 +228,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(1), combineNode.getKeys(1)); assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, combineNode.getParallelism()); assertEquals(8, reduceNode.getParallelism()); @@ -245,7 +245,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -284,7 +284,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(0), reduceNode.getKeys(0)); assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, keyExtractor.getParallelism()); @@ -303,7 +303,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public void testGroupedReduceWithSelectorFunctionKeyCombinable() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -350,7 +350,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(0), combineNode.getKeys(1)); assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, keyExtractor.getParallelism()); assertEquals(6, combineNode.getParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java index 37a8e81..57d2d54 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java @@ -45,7 +45,7 @@ public class IterationCompilerTest extends CompilerTestBase { public void testIdentityIteration() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(43); + env.setParallelism(43); IterativeDataSet iteration = env.generateSequence(-4, 1000).iterate(100); iteration.closeWith(iteration).print(); @@ -65,7 +65,7 @@ public class IterationCompilerTest extends CompilerTestBase { public void testEmptyWorksetIteration() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(43); + env.setParallelism(43); DataSet> input = env.generateSequence(1, 20) .map(new MapFunction>() { @@ -93,7 +93,7 @@ public class IterationCompilerTest extends CompilerTestBase { public void testIterationWithUnionRoot() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(43); + env.setParallelism(43); IterativeDataSet iteration = env.generateSequence(-4, 1000).iterate(100); @@ -132,7 +132,7 @@ public class IterationCompilerTest extends CompilerTestBase { public void testWorksetIterationWithUnionRoot() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(43); + env.setParallelism(43); DataSet> input = env.generateSequence(1, 20) .map(new MapFunction>() { http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java index 0724a9f..e1b18f9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -42,7 +42,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S public void testAllReduceNoCombiner() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); @@ -61,7 +61,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); - // the all-reduce has no combiner, when the DOP of the input is one + // the all-reduce has no combiner, when the parallelism of the input is one SourcePlanNode sourceNode = resolver.getNode("source"); SingleInputPlanNode reduceNode = resolver.getNode("reducer"); @@ -71,7 +71,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(sourceNode, reduceNode.getInput().getSource()); assertEquals(reduceNode, sinkNode.getInput().getSource()); - // check DOP + // check parallelism assertEquals(1, sourceNode.getParallelism()); assertEquals(1, reduceNode.getParallelism()); assertEquals(1, sinkNode.getParallelism()); @@ -87,7 +87,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S public void testAllReduceWithCombiner() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet data = env.generateSequence(1, 8000000).name("source"); @@ -121,7 +121,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy()); assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy()); - // check DOP + // check parallelism assertEquals(8, sourceNode.getParallelism()); assertEquals(8, combineNode.getParallelism()); assertEquals(1, reduceNode.getParallelism()); @@ -138,7 +138,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S public void testGroupedReduceWithFieldPositionKey() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -179,7 +179,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(new FieldList(1), combineNode.getKeys(0)); assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, combineNode.getParallelism()); assertEquals(8, reduceNode.getParallelism()); @@ -196,7 +196,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S public void testGroupedReduceWithSelectorFunctionKey() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(8); + env.setParallelism(8); DataSet> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); @@ -243,7 +243,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(new FieldList(0), combineNode.getKeys(0)); assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); - // check DOP + // check parallelism assertEquals(6, sourceNode.getParallelism()); assertEquals(6, keyExtractor.getParallelism()); assertEquals(6, combineNode.getParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java index 8720aa7..f1c2233 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java @@ -196,7 +196,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); @SuppressWarnings("unchecked") DataSet> solutionSetInput = env.fromElements(new Tuple3(1L, 2L, 3L)).name("Solution Set"); @@ -245,7 +245,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + env.setParallelism(DEFAULT_PARALLELISM); @SuppressWarnings("unchecked") DataSet> solutionSetInput = env.fromElements(new Tuple3(1L, 2L, 3L)).name("Solution Set"); http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 91d01a2..d0615b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -71,7 +71,7 @@ import static akka.dispatch.Futures.future; * The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes * from the JobGraph's corresponding JobVertex. *
  • The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are - * as many ExecutionVertices as the degree of parallelism. The ExecutionVertex is identified by + * as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by * the ExecutionJobVertex and the number of the parallel subtask
  • *
  • The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions * for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java index 8816a69..c948155 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java @@ -194,22 +194,22 @@ public class AbstractJobVertex implements java.io.Serializable { } /** - * Gets the degree of parallelism of the task. + * Gets the parallelism of the task. * - * @return The degree of parallelism of the task. + * @return The parallelism of the task. */ public int getParallelism() { return parallelism; } /** - * Sets the degree of parallelism for the task. + * Sets the parallelism for the task. * - * @param parallelism The degree of parallelism for the task. + * @param parallelism The parallelism for the task. */ public void setParallelism(int parallelism) { if (parallelism < 1) { - throw new IllegalArgumentException("The degree of parallelism must be at least one."); + throw new IllegalArgumentException("The parallelism must be at least one."); } this.parallelism = parallelism; } http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java index fb32a6e..47b1b96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/ShipStrategyType.java @@ -36,7 +36,7 @@ public enum ShipStrategyType { FORWARD(false, false), /** - * Repartitioning the data randomly, typically when the degree of parallelism between two nodes changes. + * Repartitioning the data randomly, typically when the parallelism between two nodes changes. */ PARTITION_RANDOM(true, false), http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index dfe6b50..f0001a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -86,7 +86,7 @@ public class LocalInputSplitsTest { new TestLocatableInputSplit(3, "host3") }; - // This should fail with an exception, since the DOP of 2 does not + // This should fail with an exception, since the parallelism of 2 does not // support strictly local assignment onto 3 hosts try { runTests(numHosts, slotsPerHost, parallelism, splits);