flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject incubator-flink git commit: [FLINK-1249] [APIs] [compiler] Add custom partitioner for CoGroup
Date Tue, 25 Nov 2014 14:40:31 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 4838efe56 -> bcdd167f4


[FLINK-1249] [APIs] [compiler] Add custom partitioner for CoGroup

This closes #228.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bcdd167f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bcdd167f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bcdd167f

Branch: refs/heads/master
Commit: bcdd167f4671976a92ad9300256874804598dd3e
Parents: 4838efe
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Nov 24 20:01:21 2014 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Nov 25 15:31:56 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/compiler/dag/CoGroupNode.java  |  16 +-
 .../compiler/operators/CoGroupDescriptor.java   |  21 +-
 ...naryCustomPartitioningCompatibilityTest.java |  48 +++-
 .../CoGroupCustomPartitioningTest.java          | 267 +++++++++++++++++++
 .../operators/CoGroupCompatibilityTest.java     | 161 -----------
 ...oGroupGlobalPropertiesCompatibilityTest.java | 161 +++++++++++
 .../CoGroupOnConflictingPartitioningsTest.java  |  67 +++++
 .../testfunctions/DummyCoGroupFunction.java     |  31 +++
 .../operators/base/CoGroupOperatorBase.java     |  23 +-
 .../api/java/operators/CoGroupOperator.java     |  85 +++++-
 .../apache/flink/api/scala/coGroupDataSet.scala |  33 ++-
 .../CoGroupCustomPartitioningTest.scala         | 226 ++++++++++++++++
 .../JoinCustomPartitioningTest.scala            |  26 --
 .../translation/PartitioningTestClasses.scala   |  54 ++++
 14 files changed, 1004 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
index 5081442..689dc6f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CoGroupNode.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.dag;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.compiler.DataStatistics;
@@ -36,9 +37,9 @@ public class CoGroupNode extends TwoInputNode {
 	
 	private List<OperatorDescriptorDual> dataProperties;
 	
-	public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> pactContract) {
-		super(pactContract);
-		this.dataProperties = initializeDataProperties();
+	public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> operator) {
+		super(operator);
+		this.dataProperties = initializeDataProperties(operator.getCustomPartitioner());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -80,7 +81,7 @@ public class CoGroupNode extends TwoInputNode {
 		// for CoGroup, we currently make no reasonable default estimates
 	}
 	
-	private List<OperatorDescriptorDual> initializeDataProperties() {
+	private List<OperatorDescriptorDual> initializeDataProperties(Partitioner<?> customPartitioner) {
 		Ordering groupOrder1 = null;
 		Ordering groupOrder2 = null;
 		
@@ -95,6 +96,11 @@ public class CoGroupNode extends TwoInputNode {
 			groupOrder2 = null;
 		}
 		
-		return Collections.<OperatorDescriptorDual>singletonList(new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2));
+		CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2);
+		if (customPartitioner != null) {
+			descr.setCustomPartitioner(customPartitioner);
+		}
+		
+		return Collections.<OperatorDescriptorDual>singletonList(descr);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index 90e4c3b..14f40f3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.operators;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
@@ -41,6 +42,8 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	private final Ordering ordering1;		// ordering on the first input 
 	private final Ordering ordering2;		// ordering on the second input 
 	
+	private Partitioner<?> customPartitioner;
+	
 	
 	public CoGroupDescriptor(FieldList keys1, FieldList keys2) {
 		this(keys1, keys2, null, null);
@@ -84,6 +87,10 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 		}
 	}
 	
+	public void setCustomPartitioner(Partitioner<?> customPartitioner) {
+		this.customPartitioner = customPartitioner;
+	}
+	
 	@Override
 	public DriverStrategy getStrategy() {
 		return DriverStrategy.CO_GROUP;
@@ -92,9 +99,19 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	@Override
 	protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
 		RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
-		partitioned1.setHashPartitioned(this.keys1);
+		if (this.customPartitioner == null) {
+			partitioned1.setAnyPartitioning(this.keys1);
+		} else {
+			partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
+		}
+		
 		RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
-		partitioned2.setHashPartitioned(this.keys2);
+		if (this.customPartitioner == null) {
+			partitioned2.setAnyPartitioning(this.keys2);
+		} else {
+			partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
+		}
+		
 		return Collections.singletonList(new GlobalPropertiesPair(partitioned1, partitioned2));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index ff3d78e..1f87dfb 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 
@@ -39,7 +40,7 @@ import org.junit.Test;
 public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase {
 
 	@Test
-	public void testCompatiblePartitioning() {
+	public void testCompatiblePartitioningJoin() {
 		try {
 			final Partitioner<Long> partitioner = new Partitioner<Long>() {
 				@Override
@@ -81,4 +82,49 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testCompatiblePartitioningCoGroup() {
+		try {
+			final Partitioner<Long> partitioner = new Partitioner<Long>() {
+				@Override
+				public int partition(Long key, int numPartitions) {
+					return 0;
+				}
+			};
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input1.partitionCustom(partitioner, 1)
+				.coGroup(input2.partitionCustom(partitioner, 0))
+				.where(1).equalTo(0)
+				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode partitioner1 = (SingleInputPlanNode) coGroup.getInput1().getSource();
+			SingleInputPlanNode partitioner2 = (SingleInputPlanNode) coGroup.getInput2().getSource();
+
+			assertEquals(ShipStrategyType.FORWARD, coGroup.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, coGroup.getInput2().getShipStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy());
+			assertEquals(partitioner, partitioner1.getInput().getPartitioner());
+			assertEquals(partitioner, partitioner2.getInput().getPartitioner());
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
new file mode 100644
index 0000000..c79e365
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupCustomPartitioningTest extends CompilerTestBase {
+
+	@Test
+	public void testCoGroupWithTuples() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input1
+				.coGroup(input2)
+				.where(1).equalTo(0)
+				.withPartitioner(partitioner)
+				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCoGroupWithTuplesWrongType() {
+		try {
+			final Partitioner<Integer> partitioner = new TestPartitionerInt();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			try {
+				input1
+					.coGroup(input2)
+					.where(1).equalTo(0)
+					.withPartitioner(partitioner);
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCoGroupWithPojos() {
+		try {
+			final Partitioner<Integer> partitioner = new TestPartitionerInt();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			input1
+				.coGroup(input2)
+				.where("b").equalTo("a")
+				.withPartitioner(partitioner)
+				.with(new DummyCoGroupFunction<Pojo2, Pojo3>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCoGroupWithPojosWrongType() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			try {
+				input1
+					.coGroup(input2)
+					.where("a").equalTo("b")
+					.withPartitioner(partitioner);
+				
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCoGroupWithKeySelectors() {
+		try {
+			final Partitioner<Integer> partitioner = new TestPartitionerInt();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			input1
+				.coGroup(input2)
+				.where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector())
+				.withPartitioner(partitioner)
+				.with(new DummyCoGroupFunction<Pojo2, Pojo3>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCoGroupWithKeySelectorsWrongType() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			try {
+				input1
+					.coGroup(input2)
+					.where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector())
+					.withPartitioner(partitioner);
+				
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestPartitionerInt implements Partitioner<Integer> {
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	public static class Pojo2 {
+		public int a;
+		public int b;
+	}
+	
+	public static class Pojo3 {
+		public int a;
+		public int b;
+		public int c;
+	}
+	
+	private static class Pojo2KeySelector implements KeySelector<Pojo2, Integer> {
+		@Override
+		public Integer getKey(Pojo2 value) {
+			return value.a;
+		}
+	}
+	
+	private static class Pojo3KeySelector implements KeySelector<Pojo3, Integer> {
+		@Override
+		public Integer getKey(Pojo3 value) {
+			return value.b;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
deleted file mode 100644
index ae65094..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.compiler.operators;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.compiler.dataproperties.GlobalProperties;
-import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class CoGroupCompatibilityTest {
-
-	@Test
-	public void checkCompatiblePartitionings() {
-		try {
-			final FieldList keysLeft = new FieldList(1, 4);
-			final FieldList keysRight = new FieldList(3, 1);
-			
-			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
-			
-			// test compatible hash partitioning
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setHashPartitioned(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setHashPartitioned(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setHashPartitioned(keysLeft);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setHashPartitioned(keysRight);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test compatible custom partitioning
-			{
-				Partitioner<Object> part = new Partitioner<Object>() {
-					@Override
-					public int partition(Object key, int numPartitions) {
-						return 0;
-					}
-				};
-				
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setCustomPartitioned(keysLeft, part);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setCustomPartitioned(keysRight, part);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test custom partitioning matching any partitioning
-			{
-				Partitioner<Object> part = new Partitioner<Object>() {
-					@Override
-					public int partition(Object key, int numPartitions) {
-						return 0;
-					}
-				};
-				
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void checkInompatiblePartitionings() {
-		try {
-			final FieldList keysLeft = new FieldList(1);
-			final FieldList keysRight = new FieldList(3);
-			
-			final Partitioner<Object> part = new Partitioner<Object>() {
-				@Override
-				public int partition(Object key, int numPartitions) {
-					return 0;
-				}
-			};
-			final Partitioner<Object> part2 = new Partitioner<Object>() {
-				@Override
-				public int partition(Object key, int numPartitions) {
-					return 0;
-				}
-			};
-			
-			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
-			
-			// test incompatible hash with custom partitioning
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setHashPartitioned(keysLeft);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part);
-				
-				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-			
-			// test incompatible custom partitionings
-			{
-				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
-				reqLeft.setAnyPartitioning(keysLeft);
-				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
-				reqRight.setAnyPartitioning(keysRight);
-				
-				GlobalProperties propsLeft = new GlobalProperties();
-				propsLeft.setCustomPartitioned(keysLeft, part);
-				GlobalProperties propsRight = new GlobalProperties();
-				propsRight.setCustomPartitioned(keysRight, part2);
-				
-				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java
new file mode 100644
index 0000000..668932c
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupGlobalPropertiesCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoGroupGlobalPropertiesCompatibilityTest {
+
+	@Test
+	public void checkCompatiblePartitionings() {
+		try {
+			final FieldList keysLeft = new FieldList(1, 4);
+			final FieldList keysRight = new FieldList(3, 1);
+			
+			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
+			
+			// test compatible hash partitioning
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setHashPartitioned(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setHashPartitioned(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setHashPartitioned(keysLeft);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setHashPartitioned(keysRight);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test compatible custom partitioning
+			{
+				Partitioner<Object> part = new Partitioner<Object>() {
+					@Override
+					public int partition(Object key, int numPartitions) {
+						return 0;
+					}
+				};
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setCustomPartitioned(keysLeft, part);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setCustomPartitioned(keysRight, part);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test custom partitioning matching any partitioning
+			{
+				Partitioner<Object> part = new Partitioner<Object>() {
+					@Override
+					public int partition(Object key, int numPartitions) {
+						return 0;
+					}
+				};
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void checkInompatiblePartitionings() {
+		try {
+			final FieldList keysLeft = new FieldList(1);
+			final FieldList keysRight = new FieldList(3);
+			
+			final Partitioner<Object> part = new Partitioner<Object>() {
+				@Override
+				public int partition(Object key, int numPartitions) {
+					return 0;
+				}
+			};
+			final Partitioner<Object> part2 = new Partitioner<Object>() {
+				@Override
+				public int partition(Object key, int numPartitions) {
+					return 0;
+				}
+			};
+			
+			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
+			
+			// test incompatible hash with custom partitioning
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setHashPartitioned(keysLeft);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part);
+				
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			
+			// test incompatible custom partitionings
+			{
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setAnyPartitioning(keysLeft);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setAnyPartitioning(keysRight);
+				
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setCustomPartitioned(keysLeft, part);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setCustomPartitioned(keysRight, part2);
+				
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java
new file mode 100644
index 0000000..7effce8
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerException;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.PactCompiler;
+import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
+
+	@Test
+	public void testRejectCoGroupOnHashAndRangePartitioning() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			
+			Configuration cfg = new Configuration();
+			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+			cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+			
+			input.coGroup(input).where(0).equalTo(0)
+				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+				.withParameters(cfg)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			try {
+				compileNoStats(p);
+				fail("This should fail with an exception");
+			}
+			catch (CompilerException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java
new file mode 100644
index 0000000..cb2ba9a
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyCoGroupFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public class DummyCoGroupFunction<L, R> extends RichCoGroupFunction<L, R, Tuple2<L, R>> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void coGroup(Iterable<L> first, Iterable<R> second, Collector<Tuple2<L, R>> out) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index bca909f..7fe46eb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -51,21 +52,19 @@ import java.util.List;
  */
 public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
 
-	/**
-	 * The ordering for the order inside a group from input one.
-	 */
+	/** The ordering for the order inside a group from input one. */
 	private Ordering groupOrder1;
 
-	/**
-	 * The ordering for the order inside a group from input two.
-	 */
+	/** The ordering for the order inside a group from input two. */
 	private Ordering groupOrder2;
-
-	// --------------------------------------------------------------------------------------------
+	
+	private Partitioner<?> customPartitioner;
 
 	private boolean combinableFirst;
 
 	private boolean combinableSecond;
+	
+	// --------------------------------------------------------------------------------------------
 
 	public CoGroupOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
 		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
@@ -175,6 +174,14 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 	public void setCombinableSecond(boolean combinableSecond) {
 		this.combinableSecond = combinableSecond;
 	}
+	
+	public void setCustomPartitioner(Partitioner<?> customPartitioner) {
+		this.customPartitioner = customPartitioner;
+	}
+	
+	public Partitioner<?> getCustomPartitioner() {
+		return customPartitioner;
+	}
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 1034e86..7394c18 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -23,6 +23,7 @@ import java.security.InvalidParameterException;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -62,17 +63,21 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	private final Keys<I2> keys2;
 	
 	private final String defaultName;
+	
+	private Partitioner<?> customPartitioner;
 
 
 	public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2,
 							Keys<I1> keys1, Keys<I2> keys2,
 							CoGroupFunction<I1, I2, OUT> function,
 							TypeInformation<OUT> returnType,
+							Partitioner<?> customPartitioner,
 							String defaultName)
 	{
 		super(input1, input2, returnType);
 
 		this.function = function;
+		this.customPartitioner = customPartitioner;
 		this.defaultName = defaultName;
 
 		if (keys1 == null || keys2 == null) {
@@ -110,6 +115,34 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	protected Keys<I2> getKeys2() {
 		return this.keys2;
 	}
+	
+	/**
+	 * Sets a custom partitioner for the CoGroup operation. The partitioner will be called on the join keys to determine
+	 * the partition a key should be assigned to. The partitioner is evaluated on both inputs in the
+	 * same way.
+	 * <p>
+	 * NOTE: A custom partitioner can only be used with single-field CoGroup keys, not with composite CoGroup keys.
+	 * 
+	 * @param partitioner The custom partitioner to be used.
+	 * @return This CoGroup operator, to allow for function chaining.
+	 */
+	public CoGroupOperator<I1, I2, OUT> withPartitioner(Partitioner<?> partitioner) {
+		if (partitioner != null) {
+			keys1.validateCustomPartitioner(partitioner, null);
+			keys2.validateCustomPartitioner(partitioner, null);
+		}
+		this.customPartitioner = partitioner;
+		return this;
+	}
+	
+	/**
+	 * Gets the custom partitioner used by this join, or {@code null}, if none is set.
+	 * 
+	 * @return The custom partitioner used by this join;
+	 */
+	public Partitioner<?> getPartitioner() {
+		return customPartitioner;
+	}
 
 	@Override
 	protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
@@ -135,8 +168,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					translateSelectorFunctionCoGroup(selectorKeys1, selectorKeys2, function,
 					getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 
-			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			po.setDegreeOfParallelism(getParallelism());
+			po.setCustomPartitioner(customPartitioner);
 
 			return po;
 
@@ -152,8 +185,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					translateSelectorFunctionCoGroupRight(logicalKeyPositions1, selectorKeys2, function,
 							getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 
-			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			po.setDegreeOfParallelism(getParallelism());
+			po.setCustomPartitioner(customPartitioner);
 
 			return po;
 		}
@@ -168,8 +201,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					translateSelectorFunctionCoGroupLeft(selectorKeys1, logicalKeyPositions2, function,
 							getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 
-			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			po.setDegreeOfParallelism(getParallelism());
+			po.setCustomPartitioner(customPartitioner);
 
 			return po;
 		}
@@ -193,11 +226,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			po.setFirstInput(input1);
 			po.setSecondInput(input2);
 
-			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
-
+			po.setDegreeOfParallelism(getParallelism());
+			po.setCustomPartitioner(customPartitioner);
 			return po;
-
 		}
 		else {
 			throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
@@ -495,7 +526,10 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			}
 
 			public final class CoGroupOperatorWithoutFunction {
+				
 				private final Keys<I2> keys2;
+				
+				private Partitioner<?> customPartitioner;
 
 				private CoGroupOperatorWithoutFunction(Keys<I2> keys2) {
 					if (keys2 == null) {
@@ -508,6 +542,34 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 					this.keys2 = keys2;
 				}
+				
+				/**
+				 * Sets a custom partitioner for the CoGroup operation. The partitioner will be called on the join keys to determine
+				 * the partition a key should be assigned to. The partitioner is evaluated on both inputs in the
+				 * same way.
+				 * <p>
+				 * NOTE: A custom partitioner can only be used with single-field CoGroup keys, not with composite CoGroup keys.
+				 * 
+				 * @param partitioner The custom partitioner to be used.
+				 * @return This CoGroup operator, to allow for function chaining.
+				 */
+				public CoGroupOperatorWithoutFunction withPartitioner(Partitioner<?> partitioner) {
+					if (partitioner != null) {
+						keys1.validateCustomPartitioner(partitioner, null);
+						keys2.validateCustomPartitioner(partitioner, null);
+					}
+					this.customPartitioner = partitioner;
+					return this;
+				}
+				
+				/**
+				 * Gets the custom partitioner used by this join, or {@code null}, if none is set.
+				 * 
+				 * @return The custom partitioner used by this join;
+				 */
+				public Partitioner<?> getPartitioner() {
+					return customPartitioner;
+				}
 
 				/**
 				 * Finalizes a CoGroup transformation by applying a {@link org.apache.flink.api.common.functions.RichCoGroupFunction} to groups of elements with identical keys.<br/>
@@ -524,7 +586,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 						throw new NullPointerException("CoGroup function must not be null.");
 					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
-					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType, Utils.getCallLocationName());
+					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType, 
+							customPartitioner, Utils.getCallLocationName());
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
index 1fd9f9f..da71b6d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.Collector
-
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
+import org.apache.flink.api.common.functions.Partitioner
 
 
 /**
@@ -65,6 +65,8 @@ class CoGroupDataSet[L, R](
     rightKeys: Keys[R])
   extends DataSet(defaultCoGroup) {
 
+  var customPartitioner : Partitioner[_] = _
+  
   /**
    * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the
    * result of the given function.
@@ -84,8 +86,10 @@ class CoGroupDataSet[L, R](
       rightKeys,
       coGrouper,
       implicitly[TypeInformation[O]],
+      customPartitioner,
       getCallLocationName())
 
+    
     wrap(coGroupOperator)
   }
 
@@ -109,6 +113,7 @@ class CoGroupDataSet[L, R](
       rightKeys,
       coGrouper,
       implicitly[TypeInformation[O]],
+      customPartitioner,
       getCallLocationName())
 
     wrap(coGroupOperator)
@@ -131,10 +136,35 @@ class CoGroupDataSet[L, R](
       rightKeys,
       coGrouper,
       implicitly[TypeInformation[O]],
+      customPartitioner,
       getCallLocationName())
 
     wrap(coGroupOperator)
   }
+  
+  // ----------------------------------------------------------------------------------------------
+  //  Properties
+  // ----------------------------------------------------------------------------------------------
+  
+  def withPartitioner[K : TypeInformation](partitioner : Partitioner[K]) : CoGroupDataSet[L, R] = {
+    if (partitioner != null) {
+      val typeInfo : TypeInformation[K] = implicitly[TypeInformation[K]]
+      
+      leftKeys.validateCustomPartitioner(partitioner, typeInfo)
+      rightKeys.validateCustomPartitioner(partitioner, typeInfo)
+    }
+    this.customPartitioner = partitioner
+    defaultCoGroup.withPartitioner(partitioner)
+    
+    this
+  }
+
+  /**
+   * Gets the custom partitioner used by this join, or null, if none is set.
+   */
+  def getPartitioner[K]() : Partitioner[K] = {
+    customPartitioner.asInstanceOf[Partitioner[K]]
+  }
 }
 
 /**
@@ -194,6 +224,7 @@ class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
     }
     val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
       leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType,
+      null, // partitioner
       getCallLocationName())
 
     new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
new file mode 100644
index 0000000..1c6afba
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators.translation
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.scala._
+import org.apache.flink.test.compiler.util.CompilerTestBase
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType
+import org.apache.flink.compiler.plan.SingleInputPlanNode
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.compiler.plan.DualInputPlanNode
+
+class CoGroupCustomPartitioningTest extends CompilerTestBase {
+  
+  @Test
+  def testCoGroupWithTuples() {
+    try {
+      val partitioner = new TestPartitionerLong()
+      
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements( (0L, 0L) )
+      val input2 = env.fromElements( (0L, 0L, 0L) )
+      
+      input1
+          .coGroup(input2)
+          .where(1).equalTo(0)
+          .withPartitioner(partitioner)
+        .print()
+      
+      val p = env.createProgramPlan()
+      val op = compileNoStats(p)
+      
+      val sink = op.getDataSinks.iterator().next()
+      val join = sink.getInput.getSource.asInstanceOf[DualInputPlanNode]
+      
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1.getShipStrategy)
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2.getShipStrategy)
+      assertEquals(partitioner, join.getInput1.getPartitioner)
+      assertEquals(partitioner, join.getInput2.getPartitioner)
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  def testCoGroupWithTuplesWrongType() {
+    try {
+      val partitioner = new TestPartitionerInt()
+      
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements( (0L, 0L) )
+      val input2 = env.fromElements( (0L, 0L, 0L) )
+      
+      try {
+        input1
+            .coGroup(input2)
+            .where(1).equalTo(0)
+            .withPartitioner(partitioner)
+        fail("should throw an exception")
+      }
+      catch {
+        case e: InvalidProgramException => 
+      }
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  def testCoGroupWithPojos() {
+    try {
+      val partitioner = new TestPartitionerInt()
+      
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements(new Pojo2())
+      val input2 = env.fromElements(new Pojo3())
+      
+      input1
+          .coGroup(input2)
+          .where("b").equalTo("a")
+          .withPartitioner(partitioner)
+        .print()
+        
+      val p = env.createProgramPlan()
+      val op = compileNoStats(p)
+      
+      val sink = op.getDataSinks.iterator().next()
+      val join = sink.getInput.getSource.asInstanceOf[DualInputPlanNode]
+      
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1.getShipStrategy)
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2.getShipStrategy)
+      assertEquals(partitioner, join.getInput1.getPartitioner)
+      assertEquals(partitioner, join.getInput2.getPartitioner)
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  def testCoGroupWithPojosWrongType() {
+    try {
+      val partitioner = new TestPartitionerLong()
+      
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements(new Pojo2())
+      val input2 = env.fromElements(new Pojo3())
+      
+      try {
+        input1
+            .coGroup(input2)
+            .where("a").equalTo("b")
+            .withPartitioner(partitioner)
+        fail("should throw an exception")
+      }
+      catch {
+        case e: InvalidProgramException => 
+      }
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  def testCoGroupWithKeySelectors() {
+    try {
+      val partitioner = new TestPartitionerInt()
+      
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements(new Pojo2())
+      val input2 = env.fromElements(new Pojo3())
+      
+      input1
+          .coGroup(input2)
+          .where( _.a ).equalTo( _.b )
+          .withPartitioner(partitioner)
+        .print()
+          
+      val p = env.createProgramPlan()
+      val op = compileNoStats(p)
+      
+      val sink = op.getDataSinks.iterator().next()
+      val join = sink.getInput.getSource.asInstanceOf[DualInputPlanNode]
+      
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1.getShipStrategy)
+      assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2.getShipStrategy)
+      assertEquals(partitioner, join.getInput1.getPartitioner)
+      assertEquals(partitioner, join.getInput2.getPartitioner)
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  def testCoGroupWithKeySelectorsWrongType() {
+    try {
+      val partitioner = new TestPartitionerLong()
+      
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements(new Pojo2())
+      val input2 = env.fromElements(new Pojo3())
+      
+      try {
+        input1
+            .coGroup(input2)
+            .where( _.a ).equalTo( _.b )
+            .withPartitioner(partitioner)
+        fail("should throw an exception")
+      }
+      catch {
+        case e: InvalidProgramException => 
+      }
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index debd48d..8cb49b8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -223,30 +223,4 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
       }
     }
   }
-  
-
-  // ----------------------------------------------------------------------------------------------
-  
-  private class TestPartitionerInt extends Partitioner[Int] {
-  
-    override def partition(key: Int, numPartitions: Int): Int = 0
-  }
-  
-  private class TestPartitionerLong extends Partitioner[Long] {
-  
-    override def partition(key: Long, numPartitions: Int): Int = 0
-  }
-  
-  class Pojo2 {
-  
-    var a: Int = _
-    var b: Int = _
-  }
-  
-  class Pojo3 {
-  
-    var a: Int = _
-    var b: Int = _
-    var c: Int = _
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bcdd167f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
new file mode 100644
index 0000000..bcf1869
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators.translation
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.scala._
+import org.apache.flink.test.compiler.util.CompilerTestBase
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType
+import org.apache.flink.compiler.plan.SingleInputPlanNode
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.compiler.plan.DualInputPlanNode
+
+
+
+class TestPartitionerInt extends Partitioner[Int] {
+  override def partition(key: Int, numPartitions: Int): Int = 0
+}
+
+class TestPartitionerLong extends Partitioner[Long] {
+  override def partition(key: Long, numPartitions: Int): Int = 0
+}
+
+class Pojo2 {
+
+  var a: Int = _
+  var b: Int = _
+}
+
+class Pojo3 {
+
+  var a: Int = _
+  var b: Int = _
+  var c: Int = _
+}


Mime
View raw message