Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03C6310281 for ; Tue, 25 Nov 2014 14:40:56 +0000 (UTC) Received: (qmail 23686 invoked by uid 500); 25 Nov 2014 14:40:56 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 23657 invoked by uid 500); 25 Nov 2014 14:40:55 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 23648 invoked by uid 99); 25 Nov 2014 14:40:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2014 14:40:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 25 Nov 2014 14:40:51 +0000 Received: (qmail 23064 invoked by uid 99); 25 Nov 2014 14:40:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2014 14:40:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 47EFEA19746; Tue, 25 Nov 2014 14:40:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.incubator.apache.org Message-Id: <83a1df1a8aa5418ba72fbf8a38284154@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-flink git commit: [FLINK-1249] [APIs] [compiler] Add custom partitioner for CoGroup Date: Tue, 25 Nov 2014 14:40:31 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-flink Updated Branches: refs/heads/master 4838efe56 -> bcdd167f4 [FLINK-1249] [APIs] [compiler] Add custom partitioner for CoGroup This closes #228. Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bcdd167f Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bcdd167f Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bcdd167f Branch: refs/heads/master Commit: bcdd167f4671976a92ad9300256874804598dd3e Parents: 4838efe Author: Stephan Ewen Authored: Mon Nov 24 20:01:21 2014 +0100 Committer: Till Rohrmann Committed: Tue Nov 25 15:31:56 2014 +0100 ---------------------------------------------------------------------- .../apache/flink/compiler/dag/CoGroupNode.java | 16 +- .../compiler/operators/CoGroupDescriptor.java | 21 +- ...naryCustomPartitioningCompatibilityTest.java | 48 +++- .../CoGroupCustomPartitioningTest.java | 267 +++++++++++++++++++ .../operators/CoGroupCompatibilityTest.java | 161 ----------- ...oGroupGlobalPropertiesCompatibilityTest.java | 161 +++++++++++ .../CoGroupOnConflictingPartitioningsTest.java | 67 +++++ .../testfunctions/DummyCoGroupFunction.java | 31 +++ .../operators/base/CoGroupOperatorBase.java | 23 +- .../api/java/operators/CoGroupOperator.java | 85 +++++- .../apache/flink/api/scala/coGroupDataSet.scala | 33 ++- .../CoGroupCustomPartitioningTest.scala | 226 ++++++++++++++++ .../JoinCustomPartitioningTest.scala | 26 -- .../translation/PartitioningTestClasses.scala | 54 ++++ 14 files changed, 1004 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java index 5081442..689dc6f 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java @@ -21,6 +21,7 @@ package org.apache.flink.compiler.dag; import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.compiler.DataStatistics; @@ -36,9 +37,9 @@ public class CoGroupNode extends TwoInputNode { private List dataProperties; - public CoGroupNode(CoGroupOperatorBase pactContract) { - super(pactContract); - this.dataProperties = initializeDataProperties(); + public CoGroupNode(CoGroupOperatorBase operator) { + super(operator); + this.dataProperties = initializeDataProperties(operator.getCustomPartitioner()); } // -------------------------------------------------------------------------------------------- @@ -80,7 +81,7 @@ public class CoGroupNode extends TwoInputNode { // for CoGroup, we currently make no reasonable default estimates } - private List initializeDataProperties() { + private List initializeDataProperties(Partitioner customPartitioner) { Ordering groupOrder1 = null; Ordering groupOrder2 = null; @@ -95,6 +96,11 @@ public class CoGroupNode extends TwoInputNode { groupOrder2 = null; } - return Collections.singletonList(new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2)); + CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2); + if (customPartitioner != null) { + descr.setCustomPartitioner(customPartitioner); + } + + return Collections.singletonList(descr); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java index 90e4c3b..14f40f3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java @@ -21,6 +21,7 @@ package org.apache.flink.compiler.operators; import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.util.FieldList; @@ -41,6 +42,8 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { private final Ordering ordering1; // ordering on the first input private final Ordering ordering2; // ordering on the second input + private Partitioner customPartitioner; + public CoGroupDescriptor(FieldList keys1, FieldList keys2) { this(keys1, keys2, null, null); @@ -84,6 +87,10 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { } } + public void setCustomPartitioner(Partitioner customPartitioner) { + this.customPartitioner = customPartitioner; + } + @Override public DriverStrategy getStrategy() { return DriverStrategy.CO_GROUP; @@ -92,9 +99,19 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { @Override protected List createPossibleGlobalProperties() { RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties(); - partitioned1.setHashPartitioned(this.keys1); + if (this.customPartitioner == null) { + partitioned1.setAnyPartitioning(this.keys1); + } else { + partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner); + } + RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties(); - partitioned2.setHashPartitioned(this.keys2); + if (this.customPartitioner == null) { + partitioned2.setAnyPartitioning(this.keys2); + } else { + partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner); + } + return Collections.singletonList(new GlobalPropertiesPair(partitioned1, partitioned2)); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java index ff3d78e..1f87dfb 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java @@ -32,6 +32,7 @@ import org.apache.flink.compiler.plan.OptimizedPlan; import org.apache.flink.compiler.plan.SingleInputPlanNode; import org.apache.flink.compiler.plan.SinkPlanNode; import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -39,7 +40,7 @@ import org.junit.Test; public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase { @Test - public void testCompatiblePartitioning() { + public void testCompatiblePartitioningJoin() { try { final Partitioner partitioner = new Partitioner() { @Override @@ -81,4 +82,49 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase fail(e.getMessage()); } } + + @Test + public void testCompatiblePartitioningCoGroup() { + try { + final Partitioner partitioner = new Partitioner() { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + }; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + input1.partitionCustom(partitioner, 1) + .coGroup(input2.partitionCustom(partitioner, 0)) + .where(1).equalTo(0) + .with(new DummyCoGroupFunction, Tuple3>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode partitioner1 = (SingleInputPlanNode) coGroup.getInput1().getSource(); + SingleInputPlanNode partitioner2 = (SingleInputPlanNode) coGroup.getInput2().getSource(); + + assertEquals(ShipStrategyType.FORWARD, coGroup.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, coGroup.getInput2().getShipStrategy()); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy()); + assertEquals(partitioner, partitioner1.getInput().getPartitioner()); + assertEquals(partitioner, partitioner2.getInput().getPartitioner()); + + new NepheleJobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java new file mode 100644 index 0000000..c79e365 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.custompartition; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.compiler.CompilerTestBase; +import org.apache.flink.compiler.plan.DualInputPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.SinkPlanNode; +import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class CoGroupCustomPartitioningTest extends CompilerTestBase { + + @Test + public void testCoGroupWithTuples() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + input1 + .coGroup(input2) + .where(1).equalTo(0) + .withPartitioner(partitioner) + .with(new DummyCoGroupFunction, Tuple3>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithTuplesWrongType() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); + DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); + + try { + input1 + .coGroup(input2) + .where(1).equalTo(0) + .withPartitioner(partitioner); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithPojos() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + input1 + .coGroup(input2) + .where("b").equalTo("a") + .withPartitioner(partitioner) + .with(new DummyCoGroupFunction()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithPojosWrongType() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + try { + input1 + .coGroup(input2) + .where("a").equalTo("b") + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithKeySelectors() { + try { + final Partitioner partitioner = new TestPartitionerInt(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + input1 + .coGroup(input2) + .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) + .withPartitioner(partitioner) + .with(new DummyCoGroupFunction()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource(); + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy()); + assertEquals(partitioner, join.getInput1().getPartitioner()); + assertEquals(partitioner, join.getInput2().getPartitioner()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCoGroupWithKeySelectorsWrongType() { + try { + final Partitioner partitioner = new TestPartitionerLong(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.fromElements(new Pojo2()); + DataSet input2 = env.fromElements(new Pojo3()); + + try { + input1 + .coGroup(input2) + .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) + .withPartitioner(partitioner); + + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + + private static class TestPartitionerInt implements Partitioner { + @Override + public int partition(Integer key, int numPartitions) { + return 0; + } + } + + private static class TestPartitionerLong implements Partitioner { + @Override + public int partition(Long key, int numPartitions) { + return 0; + } + } + + public static class Pojo2 { + public int a; + public int b; + } + + public static class Pojo3 { + public int a; + public int b; + public int c; + } + + private static class Pojo2KeySelector implements KeySelector { + @Override + public Integer getKey(Pojo2 value) { + return value.a; + } + } + + private static class Pojo3KeySelector implements KeySelector { + @Override + public Integer getKey(Pojo3 value) { + return value.b; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java deleted file mode 100644 index ae65094..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.compiler.operators; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.compiler.dataproperties.GlobalProperties; -import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; -import org.junit.Test; - -@SuppressWarnings("serial") -public class CoGroupCompatibilityTest { - - @Test - public void checkCompatiblePartitionings() { - try { - final FieldList keysLeft = new FieldList(1, 4); - final FieldList keysRight = new FieldList(3, 1); - - CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); - - // test compatible hash partitioning - { - RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); - reqLeft.setHashPartitioned(keysLeft); - RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); - reqRight.setHashPartitioned(keysRight); - - GlobalProperties propsLeft = new GlobalProperties(); - propsLeft.setHashPartitioned(keysLeft); - GlobalProperties propsRight = new GlobalProperties(); - propsRight.setHashPartitioned(keysRight); - - assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); - } - - // test compatible custom partitioning - { - Partitioner part = new Partitioner() { - @Override - public int partition(Object key, int numPartitions) { - return 0; - } - }; - - RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); - reqLeft.setCustomPartitioned(keysLeft, part); - RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); - reqRight.setCustomPartitioned(keysRight, part); - - GlobalProperties propsLeft = new GlobalProperties(); - propsLeft.setCustomPartitioned(keysLeft, part); - GlobalProperties propsRight = new GlobalProperties(); - propsRight.setCustomPartitioned(keysRight, part); - - assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); - } - - // test custom partitioning matching any partitioning - { - Partitioner part = new Partitioner() { - @Override - public int partition(Object key, int numPartitions) { - return 0; - } - }; - - RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); - reqLeft.setAnyPartitioning(keysLeft); - RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); - reqRight.setAnyPartitioning(keysRight); - - GlobalProperties propsLeft = new GlobalProperties(); - propsLeft.setCustomPartitioned(keysLeft, part); - GlobalProperties propsRight = new GlobalProperties(); - propsRight.setCustomPartitioned(keysRight, part); - - assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void checkInompatiblePartitionings() { - try { - final FieldList keysLeft = new FieldList(1); - final FieldList keysRight = new FieldList(3); - - final Partitioner part = new Partitioner() { - @Override - public int partition(Object key, int numPartitions) { - return 0; - } - }; - final Partitioner part2 = new Partitioner() { - @Override - public int partition(Object key, int numPartitions) { - return 0; - } - }; - - CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); - - // test incompatible hash with custom partitioning - { - RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); - reqLeft.setAnyPartitioning(keysLeft); - RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); - reqRight.setAnyPartitioning(keysRight); - - GlobalProperties propsLeft = new GlobalProperties(); - propsLeft.setHashPartitioned(keysLeft); - GlobalProperties propsRight = new GlobalProperties(); - propsRight.setCustomPartitioned(keysRight, part); - - assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); - } - - // test incompatible custom partitionings - { - RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); - reqLeft.setAnyPartitioning(keysLeft); - RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); - reqRight.setAnyPartitioning(keysRight); - - GlobalProperties propsLeft = new GlobalProperties(); - propsLeft.setCustomPartitioned(keysLeft, part); - GlobalProperties propsRight = new GlobalProperties(); - propsRight.setCustomPartitioned(keysRight, part2); - - assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java new file mode 100644 index 0000000..668932c --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.junit.Test; + +@SuppressWarnings("serial") +public class CoGroupGlobalPropertiesCompatibilityTest { + + @Test + public void checkCompatiblePartitionings() { + try { + final FieldList keysLeft = new FieldList(1, 4); + final FieldList keysRight = new FieldList(3, 1); + + CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); + + // test compatible hash partitioning + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setHashPartitioned(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setHashPartitioned(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setHashPartitioned(keysLeft); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setHashPartitioned(keysRight); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test compatible custom partitioning + { + Partitioner part = new Partitioner() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setCustomPartitioned(keysLeft, part); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setCustomPartitioned(keysRight, part); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test custom partitioning matching any partitioning + { + Partitioner part = new Partitioner() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void checkInompatiblePartitionings() { + try { + final FieldList keysLeft = new FieldList(1); + final FieldList keysRight = new FieldList(3); + + final Partitioner part = new Partitioner() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + final Partitioner part2 = new Partitioner() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); + + // test incompatible hash with custom partitioning + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setHashPartitioned(keysLeft); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test incompatible custom partitionings + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part2); + + assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java new file mode 100644 index 0000000..7effce8 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.compiler.CompilerException; +import org.apache.flink.compiler.CompilerTestBase; +import org.apache.flink.compiler.PactCompiler; +import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction; +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase { + + @Test + public void testRejectCoGroupOnHashAndRangePartitioning() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> input = env.fromElements(new Tuple2(0L, 0L)); + + Configuration cfg = new Configuration(); + cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH); + cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE); + + input.coGroup(input).where(0).equalTo(0) + .with(new DummyCoGroupFunction, Tuple2>()) + .withParameters(cfg) + .print(); + + Plan p = env.createProgramPlan(); + try { + compileNoStats(p); + fail("This should fail with an exception"); + } + catch (CompilerException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java new file mode 100644 index 0000000..cb2ba9a --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.testfunctions; + +import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +public class DummyCoGroupFunction extends RichCoGroupFunction> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable first, Iterable second, Collector> out) {} +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java index bca909f..7fe46eb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; @@ -51,21 +52,19 @@ import java.util.List; */ public class CoGroupOperatorBase> extends DualInputOperator { - /** - * The ordering for the order inside a group from input one. - */ + /** The ordering for the order inside a group from input one. */ private Ordering groupOrder1; - /** - * The ordering for the order inside a group from input two. - */ + /** The ordering for the order inside a group from input two. */ private Ordering groupOrder2; - - // -------------------------------------------------------------------------------------------- + + private Partitioner customPartitioner; private boolean combinableFirst; private boolean combinableSecond; + + // -------------------------------------------------------------------------------------------- public CoGroupOperatorBase(UserCodeWrapper udf, BinaryOperatorInformation operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) { super(udf, operatorInfo, keyPositions1, keyPositions2, name); @@ -175,6 +174,14 @@ public class CoGroupOperatorBase customPartitioner) { + this.customPartitioner = customPartitioner; + } + + public Partitioner getCustomPartitioner() { + return customPartitioner; + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 1034e86..7394c18 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -23,6 +23,7 @@ import java.security.InvalidParameterException; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -62,17 +63,21 @@ public class CoGroupOperator extends TwoInputUdfOperator keys2; private final String defaultName; + + private Partitioner customPartitioner; public CoGroupOperator(DataSet input1, DataSet input2, Keys keys1, Keys keys2, CoGroupFunction function, TypeInformation returnType, + Partitioner customPartitioner, String defaultName) { super(input1, input2, returnType); this.function = function; + this.customPartitioner = customPartitioner; this.defaultName = defaultName; if (keys1 == null || keys2 == null) { @@ -110,6 +115,34 @@ public class CoGroupOperator extends TwoInputUdfOperator getKeys2() { return this.keys2; } + + /** + * Sets a custom partitioner for the CoGroup operation. The partitioner will be called on the join keys to determine + * the partition a key should be assigned to. The partitioner is evaluated on both inputs in the + * same way. + *

+ * NOTE: A custom partitioner can only be used with single-field CoGroup keys, not with composite CoGroup keys. + * + * @param partitioner The custom partitioner to be used. + * @return This CoGroup operator, to allow for function chaining. + */ + public CoGroupOperator withPartitioner(Partitioner partitioner) { + if (partitioner != null) { + keys1.validateCustomPartitioner(partitioner, null); + keys2.validateCustomPartitioner(partitioner, null); + } + this.customPartitioner = partitioner; + return this; + } + + /** + * Gets the custom partitioner used by this join, or {@code null}, if none is set. + * + * @return The custom partitioner used by this join; + */ + public Partitioner getPartitioner() { + return customPartitioner; + } @Override protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase translateToDataFlow(Operator input1, Operator input2) { @@ -135,8 +168,8 @@ public class CoGroupOperator extends TwoInputUdfOperator extends TwoInputUdfOperator extends TwoInputUdfOperator extends TwoInputUdfOperator extends TwoInputUdfOperator keys2; + + private Partitioner customPartitioner; private CoGroupOperatorWithoutFunction(Keys keys2) { if (keys2 == null) { @@ -508,6 +542,34 @@ public class CoGroupOperator extends TwoInputUdfOperator + * NOTE: A custom partitioner can only be used with single-field CoGroup keys, not with composite CoGroup keys. + * + * @param partitioner The custom partitioner to be used. + * @return This CoGroup operator, to allow for function chaining. + */ + public CoGroupOperatorWithoutFunction withPartitioner(Partitioner partitioner) { + if (partitioner != null) { + keys1.validateCustomPartitioner(partitioner, null); + keys2.validateCustomPartitioner(partitioner, null); + } + this.customPartitioner = partitioner; + return this; + } + + /** + * Gets the custom partitioner used by this join, or {@code null}, if none is set. + * + * @return The custom partitioner used by this join; + */ + public Partitioner getPartitioner() { + return customPartitioner; + } /** * Finalizes a CoGroup transformation by applying a {@link org.apache.flink.api.common.functions.RichCoGroupFunction} to groups of elements with identical keys.
@@ -524,7 +586,8 @@ public class CoGroupOperator extends TwoInputUdfOperator returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType()); - return new CoGroupOperator(input1, input2, keys1, keys2, function, returnType, Utils.getCallLocationName()); + return new CoGroupOperator(input1, input2, keys1, keys2, function, returnType, + customPartitioner, Utils.getCallLocationName()); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala index 1fd9f9f..da71b6d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala @@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.util.Collector - import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.flink.api.common.functions.Partitioner /** @@ -65,6 +65,8 @@ class CoGroupDataSet[L, R]( rightKeys: Keys[R]) extends DataSet(defaultCoGroup) { + var customPartitioner : Partitioner[_] = _ + /** * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the * result of the given function. @@ -84,8 +86,10 @@ class CoGroupDataSet[L, R]( rightKeys, coGrouper, implicitly[TypeInformation[O]], + customPartitioner, getCallLocationName()) + wrap(coGroupOperator) } @@ -109,6 +113,7 @@ class CoGroupDataSet[L, R]( rightKeys, coGrouper, implicitly[TypeInformation[O]], + customPartitioner, getCallLocationName()) wrap(coGroupOperator) @@ -131,10 +136,35 @@ class CoGroupDataSet[L, R]( rightKeys, coGrouper, implicitly[TypeInformation[O]], + customPartitioner, getCallLocationName()) wrap(coGroupOperator) } + + // ---------------------------------------------------------------------------------------------- + // Properties + // ---------------------------------------------------------------------------------------------- + + def withPartitioner[K : TypeInformation](partitioner : Partitioner[K]) : CoGroupDataSet[L, R] = { + if (partitioner != null) { + val typeInfo : TypeInformation[K] = implicitly[TypeInformation[K]] + + leftKeys.validateCustomPartitioner(partitioner, typeInfo) + rightKeys.validateCustomPartitioner(partitioner, typeInfo) + } + this.customPartitioner = partitioner + defaultCoGroup.withPartitioner(partitioner) + + this + } + + /** + * Gets the custom partitioner used by this join, or null, if none is set. + */ + def getPartitioner[K]() : Partitioner[K] = { + customPartitioner.asInstanceOf[Partitioner[K]] + } } /** @@ -194,6 +224,7 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag]( } val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])]( leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType, + null, // partitioner getCallLocationName()) new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala new file mode 100644 index 0000000..1c6afba --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.operators.translation + +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.api.common.functions.Partitioner +import org.apache.flink.api.scala._ +import org.apache.flink.test.compiler.util.CompilerTestBase +import org.apache.flink.runtime.operators.shipping.ShipStrategyType +import org.apache.flink.compiler.plan.SingleInputPlanNode +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.compiler.plan.DualInputPlanNode + +class CoGroupCustomPartitioningTest extends CompilerTestBase { + + @Test + def testCoGroupWithTuples() { + try { + val partitioner = new TestPartitionerLong() + + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements( (0L, 0L) ) + val input2 = env.fromElements( (0L, 0L, 0L) ) + + input1 + .coGroup(input2) + .where(1).equalTo(0) + .withPartitioner(partitioner) + .print() + + val p = env.createProgramPlan() + val op = compileNoStats(p) + + val sink = op.getDataSinks.iterator().next() + val join = sink.getInput.getSource.asInstanceOf[DualInputPlanNode] + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1.getShipStrategy) + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2.getShipStrategy) + assertEquals(partitioner, join.getInput1.getPartitioner) + assertEquals(partitioner, join.getInput2.getPartitioner) + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } + + @Test + def testCoGroupWithTuplesWrongType() { + try { + val partitioner = new TestPartitionerInt() + + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements( (0L, 0L) ) + val input2 = env.fromElements( (0L, 0L, 0L) ) + + try { + input1 + .coGroup(input2) + .where(1).equalTo(0) + .withPartitioner(partitioner) + fail("should throw an exception") + } + catch { + case e: InvalidProgramException => + } + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } + + @Test + def testCoGroupWithPojos() { + try { + val partitioner = new TestPartitionerInt() + + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements(new Pojo2()) + val input2 = env.fromElements(new Pojo3()) + + input1 + .coGroup(input2) + .where("b").equalTo("a") + .withPartitioner(partitioner) + .print() + + val p = env.createProgramPlan() + val op = compileNoStats(p) + + val sink = op.getDataSinks.iterator().next() + val join = sink.getInput.getSource.asInstanceOf[DualInputPlanNode] + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1.getShipStrategy) + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2.getShipStrategy) + assertEquals(partitioner, join.getInput1.getPartitioner) + assertEquals(partitioner, join.getInput2.getPartitioner) + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } + + @Test + def testCoGroupWithPojosWrongType() { + try { + val partitioner = new TestPartitionerLong() + + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements(new Pojo2()) + val input2 = env.fromElements(new Pojo3()) + + try { + input1 + .coGroup(input2) + .where("a").equalTo("b") + .withPartitioner(partitioner) + fail("should throw an exception") + } + catch { + case e: InvalidProgramException => + } + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } + + @Test + def testCoGroupWithKeySelectors() { + try { + val partitioner = new TestPartitionerInt() + + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements(new Pojo2()) + val input2 = env.fromElements(new Pojo3()) + + input1 + .coGroup(input2) + .where( _.a ).equalTo( _.b ) + .withPartitioner(partitioner) + .print() + + val p = env.createProgramPlan() + val op = compileNoStats(p) + + val sink = op.getDataSinks.iterator().next() + val join = sink.getInput.getSource.asInstanceOf[DualInputPlanNode] + + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1.getShipStrategy) + assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2.getShipStrategy) + assertEquals(partitioner, join.getInput1.getPartitioner) + assertEquals(partitioner, join.getInput2.getPartitioner) + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } + + @Test + def testCoGroupWithKeySelectorsWrongType() { + try { + val partitioner = new TestPartitionerLong() + + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements(new Pojo2()) + val input2 = env.fromElements(new Pojo3()) + + try { + input1 + .coGroup(input2) + .where( _.a ).equalTo( _.b ) + .withPartitioner(partitioner) + fail("should throw an exception") + } + catch { + case e: InvalidProgramException => + } + } + catch { + case e: Exception => { + e.printStackTrace() + fail(e.getMessage) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala index debd48d..8cb49b8 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala @@ -223,30 +223,4 @@ class JoinCustomPartitioningTest extends CompilerTestBase { } } } - - - // ---------------------------------------------------------------------------------------------- - - private class TestPartitionerInt extends Partitioner[Int] { - - override def partition(key: Int, numPartitions: Int): Int = 0 - } - - private class TestPartitionerLong extends Partitioner[Long] { - - override def partition(key: Long, numPartitions: Int): Int = 0 - } - - class Pojo2 { - - var a: Int = _ - var b: Int = _ - } - - class Pojo3 { - - var a: Int = _ - var b: Int = _ - var c: Int = _ - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala new file mode 100644 index 0000000..bcf1869 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.operators.translation + +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.api.common.functions.Partitioner +import org.apache.flink.api.scala._ +import org.apache.flink.test.compiler.util.CompilerTestBase +import org.apache.flink.runtime.operators.shipping.ShipStrategyType +import org.apache.flink.compiler.plan.SingleInputPlanNode +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.compiler.plan.DualInputPlanNode + + + +class TestPartitionerInt extends Partitioner[Int] { + override def partition(key: Int, numPartitions: Int): Int = 0 +} + +class TestPartitionerLong extends Partitioner[Long] { + override def partition(key: Long, numPartitions: Int): Int = 0 +} + +class Pojo2 { + + var a: Int = _ + var b: Int = _ +} + +class Pojo3 { + + var a: Int = _ + var b: Int = _ + var c: Int = _ +}