Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B32FB17F50 for ; Fri, 20 Mar 2015 10:07:39 +0000 (UTC) Received: (qmail 19921 invoked by uid 500); 20 Mar 2015 10:06:41 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 19822 invoked by uid 500); 20 Mar 2015 10:06:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 19748 invoked by uid 99); 20 Mar 2015 10:06:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 10:06:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD04EE184A; Fri, 20 Mar 2015 10:06:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 20 Mar 2015 10:06:43 -0000 Message-Id: <70c86ecb35314a8f8e11ac5dc6e53ac8@git.apache.org> In-Reply-To: <901b614f7d9049c18a4f996bcd78556d@git.apache.org> References: <901b614f7d9049c18a4f996bcd78556d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler) http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java new file mode 100644 index 0000000..0273659 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial","unchecked"}) +public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase { + + @Test + public void testCompatiblePartitioningJoin() { + try { + final Partitioner partitioner = new Partitioner() { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + }; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + input1.partitionCustom(partitioner, 1) + .join(input2.partitionCustom(partitioner, 0)) + .where(1).equalTo(0) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode partitioner1 = (SingleInputPlanNode) join.getInput1().getSource(); + SingleInputPlanNode partitioner2 = (SingleInputPlanNode) join.getInput2().getSource(); + + assertEquals(ShipStrategyType.FORWARD, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, join.getInput2().getShipStrategy()); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy()); + assertEquals(partitioner, partitioner1.getInput().getPartitioner()); + assertEquals(partitioner, partitioner2.getInput().getPartitioner()); + + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCompatiblePartitioningCoGroup() { + try { + final Partitioner partitioner = new Partitioner() { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + }; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + input1.partitionCustom(partitioner, 1) + .coGroup(input2.partitionCustom(partitioner, 0)) + .where(1).equalTo(0) + .with(new DummyCoGroupFunction, Tuple3>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode partitioner1 = (SingleInputPlanNode) coGroup.getInput1().getSource(); + SingleInputPlanNode partitioner2 = (SingleInputPlanNode) coGroup.getInput2().getSource(); + + assertEquals(ShipStrategyType.FORWARD, coGroup.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, coGroup.getInput2().getShipStrategy()); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy()); + assertEquals(partitioner, partitioner1.getInput().getPartitioner()); + assertEquals(partitioner, partitioner2.getInput().getPartitioner()); + + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java new file mode 100644 index 0000000..08f7388 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class CoGroupCustomPartitioningTest extends CompilerTestBase { + + @Test + public void testCoGroupWithTuples() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + input1 + .coGroup(input2) + .where(1).equalTo(0) + .withPartitioner(partitioner) + .with(new DummyCoGroupFunction, Tuple3>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithTuplesWrongType() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + try { + input1 + .coGroup(input2) + .where(1).equalTo(0) + .withPartitioner(partitioner); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithPojos() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + input1 + .coGroup(input2) + .where("b").equalTo("a") + .withPartitioner(partitioner) + .with(new DummyCoGroupFunction()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithPojosWrongType() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + try { + input1 + .coGroup(input2) + .where("a").equalTo("b") + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithKeySelectors() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + input1 + .coGroup(input2) + .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) + .withPartitioner(partitioner) + .with(new DummyCoGroupFunction()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithKeySelectorsWrongType() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + try { + input1 + .coGroup(input2) + .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testIncompatibleHashAndCustomPartitioning() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input = env.fromElements(new Tuple3(0L, 0L, 0L)); + + DataSet> partitioned = input + .partitionCustom(new Partitioner() { + @Override + public int partition(Long key, int numPartitions) { return 0; } + }, 0) + .map(new IdentityMapper>()).withForwardedFields("0", "1", "2"); + + + DataSet> grouped = partitioned + .distinct(0, 1) + .groupBy(1) + .sortGroup(0, Order.ASCENDING) + .reduceGroup(new IdentityGroupReducer>()).withForwardedFields("0", "1"); + + grouped + .coGroup(partitioned).where(0).equalTo(0) + .with(new DummyCoGroupFunction, Tuple3>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy()); + assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || + coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } + + public static class Pojo2 { + public int a; + public int b; + } + + public static class Pojo3 { + public int a; + public int b; + public int c; + } + + private static class Pojo2KeySelector implements KeySelector { + @Override + public Integer getKey(Pojo2 value) { + return value.a; + } + } + + private static class Pojo3KeySelector implements KeySelector { + @Override + public Integer getKey(Pojo3 value) { + return value.b; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java new file mode 100644 index 0000000..9fd676f --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase { + + @Test + public void testJoinReduceCombination() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + DataSet> joined = input1.join(input2) + .where(1).equalTo(0) + .projectFirst(0, 1) + .>projectSecond(2) + .withPartitioner(partitioner); + + joined.groupBy(1).withPartitioner(partitioner) + .reduceGroup(new IdentityGroupReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + + assertTrue("Reduce is not chained, property reuse does not happen", + reducer.getInput().getSource() instanceof DualInputPlanNode); + + DualInputPlanNode join = (DualInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + + assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java new file mode 100644 index 0000000..d397ea2 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.IdentityPartitionerMapper; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class CustomPartitioningTest extends CompilerTestBase { + + @Test + public void testPartitionTuples() { + try { + final Partitioner part = new TestPartitionerInt(); + final int parallelism = 4; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(parallelism); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance(); + + data + .partitionCustom(part, 0) + .mapPartition(new IdentityPartitionerMapper>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode partitioner = (SingleInputPlanNode) mapper.getInput().getSource(); + SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(parallelism, sink.getParallelism()); + + assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); + assertEquals(parallelism, mapper.getParallelism()); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); + assertEquals(part, partitioner.getInput().getPartitioner()); + assertEquals(parallelism, partitioner.getParallelism()); + + assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); + assertEquals(parallelism, balancer.getParallelism()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionTuplesInvalidType() { + try { + final int parallelism = 4; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(parallelism); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance(); + + try { + data + .partitionCustom(new TestPartitionerLong(), 0); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionPojo() { + try { + final Partitioner part = new TestPartitionerInt(); + final int parallelism = 4; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(parallelism); + + DataSet data = env.fromElements(new Pojo()) + .rebalance(); + + data + .partitionCustom(part, "a") + .mapPartition(new IdentityPartitionerMapper()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode partitioner = (SingleInputPlanNode) mapper.getInput().getSource(); + SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(parallelism, sink.getParallelism()); + + assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); + assertEquals(parallelism, mapper.getParallelism()); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); + assertEquals(part, partitioner.getInput().getPartitioner()); + assertEquals(parallelism, partitioner.getParallelism()); + + assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); + assertEquals(parallelism, balancer.getParallelism()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionPojoInvalidType() { + try { + final int parallelism = 4; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(parallelism); + + DataSet data = env.fromElements(new Pojo()) + .rebalance(); + + try { + data + .partitionCustom(new TestPartitionerLong(), "a"); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionKeySelector() { + try { + final Partitioner part = new TestPartitionerInt(); + final int parallelism = 4; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(parallelism); + + DataSet data = env.fromElements(new Pojo()) + .rebalance(); + + data + .partitionCustom(part, new TestKeySelectorInt()) + .mapPartition(new IdentityPartitionerMapper()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode keyRemover = (SingleInputPlanNode) mapper.getInput().getSource(); + SingleInputPlanNode partitioner = (SingleInputPlanNode) keyRemover.getInput().getSource(); + SingleInputPlanNode keyExtractor = (SingleInputPlanNode) partitioner.getInput().getSource(); + SingleInputPlanNode balancer = (SingleInputPlanNode) keyExtractor.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(parallelism, sink.getParallelism()); + + assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); + assertEquals(parallelism, mapper.getParallelism()); + + assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput().getShipStrategy()); + assertEquals(parallelism, keyRemover.getParallelism()); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); + assertEquals(part, partitioner.getInput().getPartitioner()); + assertEquals(parallelism, partitioner.getParallelism()); + + assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput().getShipStrategy()); + assertEquals(parallelism, keyExtractor.getParallelism()); + + assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); + assertEquals(parallelism, balancer.getParallelism()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionKeySelectorInvalidType() { + try { + final Partitioner part = (Partitioner) (Partitioner) new TestPartitionerLong(); + final int parallelism = 4; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(parallelism); + + DataSet data = env.fromElements(new Pojo()) + .rebalance(); + + try { + data + .partitionCustom(part, new TestKeySelectorInt()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + public static class Pojo { + public int a; + public int b; + } + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } + + private static class TestKeySelectorInt implements KeySelector { + @Override + public Integer getKey(T value) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java new file mode 100644 index 0000000..360487b --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.DummyReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class GroupingKeySelectorTranslationTest extends CompilerTestBase { + + @Test + public void testCustomPartitioningKeySelectorReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(new TestKeySelector>()) + .withPartitioner(new TestPartitionerInt()) + .reduce(new DummyReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode keyRemovingMapper = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode reducer = (SingleInputPlanNode) keyRemovingMapper.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, keyRemovingMapper.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningKeySelectorGroupReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(new TestKeySelector>()) + .withPartitioner(new TestPartitionerInt()) + .reduceGroup(new IdentityGroupReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningKeySelectorGroupReduceSorted() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple3(0, 0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(new TestKeySelector>()) + .withPartitioner(new TestPartitionerInt()) + .sortGroup(new TestKeySelector>(), Order.ASCENDING) + .reduceGroup(new IdentityGroupReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningKeySelectorInvalidType() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + try { + data + .groupBy(new TestKeySelector>()) + .withPartitioner(new TestPartitionerLong()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningKeySelectorInvalidTypeSorted() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple3(0, 0, 0)) + .rebalance().setParallelism(4); + + try { + data + .groupBy(new TestKeySelector>()) + .sortGroup(1, Order.ASCENDING) + .withPartitioner(new TestPartitionerLong()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleRejectCompositeKey() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple3(0, 0, 0)) + .rebalance().setParallelism(4); + + try { + data + .groupBy(new TestBinaryKeySelector>()) + .withPartitioner(new TestPartitionerInt()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } + + private static class TestKeySelector implements KeySelector { + @Override + public Integer getKey(T value) { + return value.getField(0); + } + } + + private static class TestBinaryKeySelector implements KeySelector> { + @Override + public Tuple2 getKey(T value) { + return new Tuple2(value.getField(0), value.getField(1)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java new file mode 100644 index 0000000..8cd4809 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.DummyReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings("serial") +public class GroupingPojoTranslationTest extends CompilerTestBase { + + @Test + public void testCustomPartitioningTupleReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo2()) + .rebalance().setParallelism(4); + + data.groupBy("a").withPartitioner(new TestPartitionerInt()) + .reduce(new DummyReducer()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleGroupReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo2()) + .rebalance().setParallelism(4); + + data.groupBy("a").withPartitioner(new TestPartitionerInt()) + .reduceGroup(new IdentityGroupReducer()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleGroupReduceSorted() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo3()) + .rebalance().setParallelism(4); + + data.groupBy("a").withPartitioner(new TestPartitionerInt()) + .sortGroup("b", Order.ASCENDING) + .reduceGroup(new IdentityGroupReducer()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleGroupReduceSorted2() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo4()) + .rebalance().setParallelism(4); + + data.groupBy("a").withPartitioner(new TestPartitionerInt()) + .sortGroup("b", Order.ASCENDING) + .sortGroup("c", Order.DESCENDING) + .reduceGroup(new IdentityGroupReducer()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleInvalidType() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo2()) + .rebalance().setParallelism(4); + + try { + data.groupBy("a").withPartitioner(new TestPartitionerLong()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleInvalidTypeSorted() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo3()) + .rebalance().setParallelism(4); + + try { + data.groupBy("a") + .sortGroup("b", Order.ASCENDING) + .withPartitioner(new TestPartitionerLong()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleRejectCompositeKey() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet data = env.fromElements(new Pojo2()) + .rebalance().setParallelism(4); + + try { + data.groupBy("a", "b") + .withPartitioner(new TestPartitionerInt()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + public static class Pojo2 { + public int a; + public int b; + + } + + public static class Pojo3 { + public int a; + public int b; + public int c; + } + + public static class Pojo4 { + public int a; + public int b; + public int c; + public int d; + } + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java new file mode 100644 index 0000000..779b8e5 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.DummyReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class GroupingTupleTranslationTest extends CompilerTestBase { + + @Test + public void testCustomPartitioningTupleAgg() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(0).withPartitioner(new TestPartitionerInt()) + .sum(1) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(0).withPartitioner(new TestPartitionerInt()) + .reduce(new DummyReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleGroupReduce() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(0).withPartitioner(new TestPartitionerInt()) + .reduceGroup(new IdentityGroupReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleGroupReduceSorted() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple3(0, 0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(0).withPartitioner(new TestPartitionerInt()) + .sortGroup(1, Order.ASCENDING) + .reduceGroup(new IdentityGroupReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleGroupReduceSorted2() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple4(0, 0, 0, 0)) + .rebalance().setParallelism(4); + + data.groupBy(0).withPartitioner(new TestPartitionerInt()) + .sortGroup(1, Order.ASCENDING) + .sortGroup(2, Order.DESCENDING) + .reduceGroup(new IdentityGroupReducer>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleInvalidType() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple2(0, 0)) + .rebalance().setParallelism(4); + + try { + data.groupBy(0).withPartitioner(new TestPartitionerLong()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleInvalidTypeSorted() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple3(0, 0, 0)) + .rebalance().setParallelism(4); + + try { + data.groupBy(0) + .sortGroup(1, Order.ASCENDING) + .withPartitioner(new TestPartitionerLong()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCustomPartitioningTupleRejectCompositeKey() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = env.fromElements(new Tuple3(0, 0, 0)) + .rebalance().setParallelism(4); + + try { + data.groupBy(0, 1) + .withPartitioner(new TestPartitionerInt()); + fail("Should throw an exception"); + } + catch (InvalidProgramException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java new file mode 100644 index 0000000..eae40cf --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class JoinCustomPartitioningTest extends CompilerTestBase { + + @Test + public void testJoinWithTuples() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + input1 + .join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testJoinWithTuplesWrongType() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + try { + input1 + .join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0) + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testJoinWithPojos() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + input1 + .join(input2, JoinHint.REPARTITION_HASH_FIRST) + .where("b").equalTo("a").withPartitioner(partitioner) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testJoinWithPojosWrongType() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + try { + input1 + .join(input2, JoinHint.REPARTITION_HASH_FIRST) + .where("a").equalTo("b") + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testJoinWithKeySelectors() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + input1 + .join(input2, JoinHint.REPARTITION_HASH_FIRST) + .where(new Pojo2KeySelector()) + .equalTo(new Pojo3KeySelector()) + .withPartitioner(partitioner) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testJoinWithKeySelectorsWrongType() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + try { + input1 + .join(input2, JoinHint.REPARTITION_HASH_FIRST) + .where(new Pojo2KeySelector()) + .equalTo(new Pojo3KeySelector()) + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testIncompatibleHashAndCustomPartitioning() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input = env.fromElements(new Tuple3(0L, 0L, 0L)); + + DataSet> partitioned = input + .partitionCustom(new Partitioner() { + @Override + public int partition(Long key, int numPartitions) { return 0; } + }, 0) + .map(new IdentityMapper>()).withForwardedFields("0", "1", "2"); + + + DataSet> grouped = partitioned + .distinct(0, 1) + .groupBy(1) + .sortGroup(0, Order.ASCENDING) + .reduceGroup(new IdentityGroupReducer>()).withForwardedFields("0", "1"); + + grouped + .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) + .with(new DummyFlatJoinFunction>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy()); + assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || + coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } + + public static class Pojo2 { + public int a; + public int b; + } + + public static class Pojo3 { + public int a; + public int b; + public int c; + } + + private static class Pojo2KeySelector implements KeySelector { + @Override + public Integer getKey(Pojo2 value) { + return value.a; + } + } + + private static class Pojo3KeySelector implements KeySelector { + @Override + public Integer getKey(Pojo3 value) { + return value.b; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java new file mode 100644 index 0000000..cb4bd78 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataexchange; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; +import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper; +import org.apache.flink.optimizer.testfunctions.SelectOneReducer; +import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.junit.Test; + +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * This test checks the correct assignment of the DataExchangeMode to + * connections for programs that branch, and re-join those branches. + * + *
+ *                                         /-> (sink)
+ *                                        /
+ *                         /-> (reduce) -+          /-> (flatmap) -> (sink)
+ *                        /               \        /
+ *     (source) -> (map) -                (join) -+-----\
+ *                        \               /              \
+ *                         \-> (filter) -+                \
+ *                                       \                (co group) -> (sink)
+ *                                        \                /
+ *                                         \-> (reduce) - /
+ * 
+ */ +@SuppressWarnings("serial") +public class DataExchangeModeClosedBranchingTest extends CompilerTestBase { + + @Test + public void testPipelinedForced() { + // PIPELINED_FORCED should result in pipelining all the way + verifyBranchingJoiningPlan(ExecutionMode.PIPELINED_FORCED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, + DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED); + } + + @Test + public void testPipelined() { + // PIPELINED should result in pipelining all the way + verifyBranchingJoiningPlan(ExecutionMode.PIPELINED, + DataExchangeMode.PIPELINED, // to map + DataExchangeMode.PIPELINED, // to combiner connections are pipelined + DataExchangeMode.BATCH, // to reduce + DataExchangeMode.BATCH, // to filter + DataExchangeMode.PIPELINED, // to sink after reduce + DataExchangeMode.PIPELINED, // to join (first input) + DataExchangeMode.BATCH, // to join (second input) + DataExchangeMode.PIPELINED, // combiner connections are pipelined + DataExchangeMode.BATCH, // to other reducer + DataExchangeMode.PIPELINED, // to flatMap + DataExchangeMode.PIPELINED, // to sink after flatMap + DataExchangeMode.PIPELINED, // to coGroup (first input) + DataExchangeMode.PIPELINED, // to coGroup (second input) + DataExchangeMode.PIPELINED // to sink after coGroup + ); + } + + @Test + public void testBatch() { + // BATCH should result in batching the shuffle all the way + verifyBranchingJoiningPlan(ExecutionMode.BATCH, + DataExchangeMode.PIPELINED, // to map + DataExchangeMode.PIPELINED, // to combiner connections are pipelined + DataExchangeMode.BATCH, // to reduce + DataExchangeMode.BATCH, // to filter + DataExchangeMode.PIPELINED, // to sink after reduce + DataExchangeMode.BATCH, // to join (first input) + DataExchangeMode.BATCH, // to join (second input) + DataExchangeMode.PIPELINED, // combiner connections are pipelined + DataExchangeMode.BATCH, // to other reducer + DataExchangeMode.PIPELINED, // to flatMap + DataExchangeMode.PIPELINED, // to sink after flatMap + DataExchangeMode.BATCH, // to coGroup (first input) + DataExchangeMode.BATCH, // to coGroup (second input) + DataExchangeMode.PIPELINED // to sink after coGroup + ); + } + + @Test + public void testBatchForced() { + // BATCH_FORCED should result in batching all the way + verifyBranchingJoiningPlan(ExecutionMode.BATCH_FORCED, + DataExchangeMode.BATCH, // to map + DataExchangeMode.PIPELINED, // to combiner connections are pipelined + DataExchangeMode.BATCH, // to reduce + DataExchangeMode.BATCH, // to filter + DataExchangeMode.BATCH, // to sink after reduce + DataExchangeMode.BATCH, // to join (first input) + DataExchangeMode.BATCH, // to join (second input) + DataExchangeMode.PIPELINED, // combiner connections are pipelined + DataExchangeMode.BATCH, // to other reducer + DataExchangeMode.BATCH, // to flatMap + DataExchangeMode.BATCH, // to sink after flatMap + DataExchangeMode.BATCH, // to coGroup (first input) + DataExchangeMode.BATCH, // to coGroup (second input) + DataExchangeMode.BATCH // to sink after coGroup + ); + } + + private void verifyBranchingJoiningPlan(ExecutionMode execMode, + DataExchangeMode toMap, + DataExchangeMode toReduceCombiner, + DataExchangeMode toReduce, + DataExchangeMode toFilter, + DataExchangeMode toReduceSink, + DataExchangeMode toJoin1, + DataExchangeMode toJoin2, + DataExchangeMode toOtherReduceCombiner, + DataExchangeMode toOtherReduce, + DataExchangeMode toFlatMap, + DataExchangeMode toFlatMapSink, + DataExchangeMode toCoGroup1, + DataExchangeMode toCoGroup2, + DataExchangeMode toCoGroupSink) + { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setExecutionMode(execMode); + + DataSet> data = env.fromElements(33L, 44L) + .map(new MapFunction>() { + @Override + public Tuple2 map(Long value) { + return new Tuple2(value, value); + } + }); + + DataSet> reduced = data.groupBy(0).reduce(new SelectOneReducer>()); + reduced.output(new DiscardingOutputFormat>()).name("reduceSink"); + + DataSet> filtered = data.filter(new FilterFunction>() { + @Override + public boolean filter(Tuple2 value) throws Exception { + return false; + } + }); + + DataSet> joined = reduced.join(filtered) + .where(1).equalTo(1) + .with(new DummyFlatJoinFunction>()); + + joined.flatMap(new IdentityFlatMapper>()) + .output(new DiscardingOutputFormat>()).name("flatMapSink"); + + joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer>())) + .where(0).equalTo(0) + .with(new DummyCoGroupFunction, Tuple2>()) + .output(new DiscardingOutputFormat, Tuple2>>()).name("cgSink"); + + + OptimizedPlan optPlan = compileNoStats(env.createProgramPlan()); + + SinkPlanNode reduceSink = findSink(optPlan.getDataSinks(), "reduceSink"); + SinkPlanNode flatMapSink = findSink(optPlan.getDataSinks(), "flatMapSink"); + SinkPlanNode cgSink = findSink(optPlan.getDataSinks(), "cgSink"); + + DualInputPlanNode coGroupNode = (DualInputPlanNode) cgSink.getPredecessor(); + + DualInputPlanNode joinNode = (DualInputPlanNode) coGroupNode.getInput1().getSource(); + SingleInputPlanNode otherReduceNode = (SingleInputPlanNode) coGroupNode.getInput2().getSource(); + SingleInputPlanNode otherReduceCombinerNode = (SingleInputPlanNode) otherReduceNode.getPredecessor(); + + SingleInputPlanNode reduceNode = (SingleInputPlanNode) joinNode.getInput1().getSource(); + SingleInputPlanNode reduceCombinerNode = (SingleInputPlanNode) reduceNode.getPredecessor(); + assertEquals(reduceNode, reduceSink.getPredecessor()); + + SingleInputPlanNode filterNode = (SingleInputPlanNode) joinNode.getInput2().getSource(); + assertEquals(filterNode, otherReduceCombinerNode.getPredecessor()); + + SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor(); + assertEquals(mapNode, reduceCombinerNode.getPredecessor()); + + SingleInputPlanNode flatMapNode = (SingleInputPlanNode) flatMapSink.getPredecessor(); + assertEquals(joinNode, flatMapNode.getPredecessor()); + + // verify the data exchange modes + + assertEquals(toReduceSink, reduceSink.getInput().getDataExchangeMode()); + assertEquals(toFlatMapSink, flatMapSink.getInput().getDataExchangeMode()); + assertEquals(toCoGroupSink, cgSink.getInput().getDataExchangeMode()); + + assertEquals(toCoGroup1, coGroupNode.getInput1().getDataExchangeMode()); + assertEquals(toCoGroup2, coGroupNode.getInput2().getDataExchangeMode()); + + assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode()); + assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode()); + + assertEquals(toOtherReduce, otherReduceNode.getInput().getDataExchangeMode()); + assertEquals(toOtherReduceCombiner, otherReduceCombinerNode.getInput().getDataExchangeMode()); + + assertEquals(toFlatMap, flatMapNode.getInput().getDataExchangeMode()); + + assertEquals(toFilter, filterNode.getInput().getDataExchangeMode()); + assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode()); + assertEquals(toReduceCombiner, reduceCombinerNode.getInput().getDataExchangeMode()); + + assertEquals(toMap, mapNode.getInput().getDataExchangeMode()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private SinkPlanNode findSink(Collection collection, String name) { + for (SinkPlanNode node : collection) { + String nodeName = node.getOptimizerNode().getOperator().getName(); + if (nodeName != null && nodeName.equals(name)) { + return node; + } + } + + throw new IllegalArgumentException("No node with that name was found."); + } +}