flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/4] incubator-flink git commit: [FLINK-1237] Add support for custom partitioners - Functions: GroupReduce, Reduce, Aggregate on UnsortedGrouping, SortedGrouping, Join (Java API & Scala API) - Manual partition on DataSet (Java API & S
Date Tue, 18 Nov 2014 11:22:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
new file mode 100644
index 0000000..8f446a7
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingKeySelectorTranslationTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyReducer;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
+	
+	@Test
+	public void testCustomPartitioningKeySelectorReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
+				.withPartitioner(new TestPartitionerInt())
+				.reduce(new DummyReducer<Tuple2<Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode keyRemovingMapper = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) keyRemovingMapper.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, keyRemovingMapper.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningKeySelectorGroupReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
+				.withPartitioner(new TestPartitionerInt())
+				.reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningKeySelectorGroupReduceSorted() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
+				.withPartitioner(new TestPartitionerInt())
+				.sortGroup(1, Order.ASCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningKeySelectorGroupReduceSorted2() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			data
+				.groupBy(new TestKeySelector<Tuple4<Integer,Integer,Integer,Integer>>())
+				.withPartitioner(new TestPartitionerInt())
+				.sortGroup(1, Order.ASCENDING)
+				.sortGroup(2, Order.DESCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningKeySelectorInvalidType() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			try {
+				data
+					.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
+					.withPartitioner(new TestPartitionerLong());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningKeySelectorInvalidTypeSorted() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			try {
+				data
+					.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
+					.sortGroup(1, Order.ASCENDING)
+					.withPartitioner(new TestPartitionerLong());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleRejectCompositeKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			try {
+				data
+					.groupBy(new TestBinaryKeySelector<Tuple3<Integer,Integer,Integer>>())
+					.withPartitioner(new TestPartitionerInt());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestPartitionerInt implements Partitioner<Integer> {
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestKeySelector<T extends Tuple> implements KeySelector<T, Integer> {
+		@Override
+		public Integer getKey(T value) {
+			return value.getField(0);
+		}
+	}
+	
+	private static class TestBinaryKeySelector<T extends Tuple> implements KeySelector<T, Tuple2<Integer, Integer>> {
+		@Override
+		public Tuple2<Integer, Integer> getKey(T value) {
+			return new Tuple2<Integer, Integer>(value.<Integer>getField(0), value.<Integer>getField(1));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingPojoTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingPojoTranslationTest.java
new file mode 100644
index 0000000..087d32d
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingPojoTranslationTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyReducer;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class GroupingPojoTranslationTest extends CompilerTestBase {
+	
+	@Test
+	public void testCustomPartitioningTupleReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> data = env.fromElements(new Pojo2())
+					.rebalance().setParallelism(4);
+			
+			data.groupBy("a").withPartitioner(new TestPartitionerInt())
+				.reduce(new DummyReducer<Pojo2>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleGroupReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> data = env.fromElements(new Pojo2())
+					.rebalance().setParallelism(4);
+			
+			data.groupBy("a").withPartitioner(new TestPartitionerInt())
+				.reduceGroup(new IdentityGroupReducer<Pojo2>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleGroupReduceSorted() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo3> data = env.fromElements(new Pojo3())
+					.rebalance().setParallelism(4);
+			
+			data.groupBy("a").withPartitioner(new TestPartitionerInt())
+				.sortGroup("b", Order.ASCENDING)
+				.reduceGroup(new IdentityGroupReducer<Pojo3>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleGroupReduceSorted2() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo4> data = env.fromElements(new Pojo4())
+					.rebalance().setParallelism(4);
+			
+			data.groupBy("a").withPartitioner(new TestPartitionerInt())
+				.sortGroup("b", Order.ASCENDING)
+				.sortGroup("c", Order.DESCENDING)
+				.reduceGroup(new IdentityGroupReducer<Pojo4>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleInvalidType() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> data = env.fromElements(new Pojo2())
+					.rebalance().setParallelism(4);
+			
+			try {
+				data.groupBy("a").withPartitioner(new TestPartitionerLong());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleInvalidTypeSorted() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo3> data = env.fromElements(new Pojo3())
+					.rebalance().setParallelism(4);
+			
+			try {
+				data.groupBy("a")
+					.sortGroup("b", Order.ASCENDING)
+					.withPartitioner(new TestPartitionerLong());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleRejectCompositeKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> data = env.fromElements(new Pojo2())
+					.rebalance().setParallelism(4);
+			
+			try {
+				data.groupBy("a", "b")
+					.withPartitioner(new TestPartitionerInt());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class Pojo2 {
+		public int a;
+		public int b;
+		
+	}
+	
+	public static class Pojo3 {
+		public int a;
+		public int b;
+		public int c;
+	}
+	
+	public static class Pojo4 {
+		public int a;
+		public int b;
+		public int c;
+		public int d;
+	}
+	
+	private static class TestPartitionerInt implements Partitioner<Integer> {
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingTupleTranslationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingTupleTranslationTest.java
new file mode 100644
index 0000000..7cfabfb
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/GroupingTupleTranslationTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyReducer;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class GroupingTupleTranslationTest extends CompilerTestBase {
+	
+	@Test
+	public void testCustomPartitioningTupleAgg() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(0).withPartitioner(new TestPartitionerInt())
+				.sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(0).withPartitioner(new TestPartitionerInt())
+				.reduce(new DummyReducer<Tuple2<Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleGroupReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(0).withPartitioner(new TestPartitionerInt())
+				.reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleGroupReduceSorted() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(0).withPartitioner(new TestPartitionerInt())
+				.sortGroup(1, Order.ASCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleGroupReduceSorted2() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			data.groupBy(0).withPartitioner(new TestPartitionerInt())
+				.sortGroup(1, Order.ASCENDING)
+				.sortGroup(2, Order.DESCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleInvalidType() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
+					.rebalance().setParallelism(4);
+			
+			try {
+				data.groupBy(0).withPartitioner(new TestPartitionerLong());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleInvalidTypeSorted() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			try {
+				data.groupBy(0)
+					.sortGroup(1, Order.ASCENDING)
+					.withPartitioner(new TestPartitionerLong());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningTupleRejectCompositeKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
+					.rebalance().setParallelism(4);
+			
+			try {
+				data.groupBy(0, 1)
+					.withPartitioner(new TestPartitionerInt());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestPartitionerInt implements Partitioner<Integer> {
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
new file mode 100644
index 0000000..0020c66
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class JoinCustomPartitioningTest extends CompilerTestBase {
+
+	@Test
+	public void testJoinWithTuples() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input1
+				.join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJoinWithTuplesWrongType() {
+		try {
+			final Partitioner<Integer> partitioner = new TestPartitionerInt();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			try {
+				input1
+					.join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0)
+					.withPartitioner(partitioner);
+				
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJoinWithPojos() {
+		try {
+			final Partitioner<Integer> partitioner = new TestPartitionerInt();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			input1
+				.join(input2, JoinHint.REPARTITION_HASH_FIRST)
+				.where("b").equalTo("a").withPartitioner(partitioner)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJoinWithPojosWrongType() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			try {
+				input1
+					.join(input2, JoinHint.REPARTITION_HASH_FIRST)
+					.where("a").equalTo("b")
+					.withPartitioner(partitioner);
+				
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJoinWithKeySelectors() {
+		try {
+			final Partitioner<Integer> partitioner = new TestPartitionerInt();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			input1
+				.join(input2, JoinHint.REPARTITION_HASH_FIRST)
+				.where(new Pojo2KeySelector())
+				.equalTo(new Pojo3KeySelector())
+				.withPartitioner(partitioner)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testJoinWithKeySelectorsWrongType() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
+			DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
+			
+			try {
+				input1
+					.join(input2, JoinHint.REPARTITION_HASH_FIRST)
+					.where(new Pojo2KeySelector())
+					.equalTo(new Pojo3KeySelector())
+					.withPartitioner(partitioner);
+				
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestPartitionerInt implements Partitioner<Integer> {
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	public static class Pojo2 {
+		public int a;
+		public int b;
+	}
+	
+	public static class Pojo3 {
+		public int a;
+		public int b;
+		public int c;
+	}
+	
+	private static class Pojo2KeySelector implements KeySelector<Pojo2, Integer> {
+		@Override
+		public Integer getKey(Pojo2 value) {
+			return value.a;
+		}
+	}
+	
+	private static class Pojo3KeySelector implements KeySelector<Pojo3, Integer> {
+		@Override
+		public Integer getKey(Pojo3 value) {
+			return value.b;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..fc8616e
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.compiler.dag.OptimizerNode;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class GlobalPropertiesFilteringTest {
+
+	@Test
+	public void testCustomPartitioningPreserves() {
+		try {
+			Partitioner<?> partitioner = new MockPartitioner();
+			
+			GlobalProperties gp = new GlobalProperties();
+			gp.setCustomPartitioned(new FieldList(2, 3), partitioner);
+			
+			OptimizerNode node = mock(OptimizerNode.class);
+			when(node.isFieldConstant(Matchers.anyInt(), Matchers.anyInt())).thenReturn(true);
+			
+			GlobalProperties filtered = gp.filterByNodesConstantSet(node, 0);
+			
+			assertTrue(filtered.isPartitionedOnFields(new FieldSet(2, 3)));
+			assertEquals(PartitioningProperty.CUSTOM_PARTITIONING, filtered.getPartitioning());
+			assertEquals(partitioner, filtered.getCustomPartitioner());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
new file mode 100644
index 0000000..fd4ad82
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.junit.Test;
+
+public class GlobalPropertiesMatchingTest {
+
+	@Test
+	public void testMatchingAnyPartitioning() {
+		try {
+			
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setAnyPartitioning(new FieldSet(6, 2));
+			
+			// match any partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setAnyPartitioning(new FieldList(2, 6));
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setAnyPartitioning(new FieldList(6, 2));
+				assertTrue(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setAnyPartitioning(new FieldList(6, 1));
+				assertFalse(req.isMetBy(gp3));
+				
+				GlobalProperties gp4 = new GlobalProperties();
+				gp4.setAnyPartitioning(new FieldList(2));
+				assertTrue(req.isMetBy(gp4));
+			}
+			
+			// match hash partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setHashPartitioned(new FieldList(2, 6));
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setHashPartitioned(new FieldList(6, 2));
+				assertTrue(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setHashPartitioned(new FieldList(6, 1));
+				assertFalse(req.isMetBy(gp3));
+			}
+			
+			// match range partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+				assertTrue(req.isMetBy(gp2));
+
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+				assertFalse(req.isMetBy(gp3));
+				
+				GlobalProperties gp4 = new GlobalProperties();
+				gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+				assertTrue(req.isMetBy(gp4));
+			}
+			
+			// match custom partitioning
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setCustomPartitioned(new FieldList(2, 6), new MockPartitioner());
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
+				assertTrue(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setCustomPartitioned(new FieldList(6, 1), new MockPartitioner());
+				assertFalse(req.isMetBy(gp3));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMatchingCustomPartitioning() {
+		try {
+			final Partitioner<Long> partitioner = new MockPartitioner();
+			
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setCustomPartitioned(new FieldSet(6, 2), partitioner);
+			
+			// match custom partitionings
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setCustomPartitioned(new FieldList(2, 6), partitioner);
+				assertTrue(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setCustomPartitioned(new FieldList(6, 2), partitioner);
+				assertTrue(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setCustomPartitioned(new FieldList(6, 2), new MockPartitioner());
+				assertFalse(req.isMetBy(gp3));
+			}
+			
+			// cannot match other types of partitionings
+			{
+				GlobalProperties gp1 = new GlobalProperties();
+				gp1.setAnyPartitioning(new FieldList(6, 2));
+				assertFalse(req.isMetBy(gp1));
+				
+				GlobalProperties gp2 = new GlobalProperties();
+				gp2.setHashPartitioned(new FieldList(6, 2));
+				assertFalse(req.isMetBy(gp2));
+				
+				GlobalProperties gp3 = new GlobalProperties();
+				gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+				assertFalse(req.isMetBy(gp3));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java
new file mode 100644
index 0000000..f99ebb6
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesPushdownTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.compiler.dag.OptimizerNode;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+public class GlobalPropertiesPushdownTest {
+
+	@Test
+	public void testAnyPartitioningPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setAnyPartitioning(new FieldSet(3, 1));
+			
+			RequestedGlobalProperties preserved = req.filterByNodesConstantSet(getAllPreservingNode(), 0);
+			assertEquals(PartitioningProperty.ANY_PARTITIONING, preserved.getPartitioning());
+			assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+			
+			RequestedGlobalProperties nonPreserved = req.filterByNodesConstantSet(getNonePreservingNode(), 0);
+			assertTrue(nonPreserved == null || nonPreserved.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHashPartitioningPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setHashPartitioned(new FieldSet(3, 1));
+			
+			RequestedGlobalProperties preserved = req.filterByNodesConstantSet(getAllPreservingNode(), 0);
+			assertEquals(PartitioningProperty.HASH_PARTITIONED, preserved.getPartitioning());
+			assertTrue(preserved.getPartitionedFields().isValidSubset(new FieldSet(1, 3)));
+			
+			RequestedGlobalProperties nonPreserved = req.filterByNodesConstantSet(getNonePreservingNode(), 0);
+			assertTrue(nonPreserved == null || nonPreserved.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningNotPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setCustomPartitioned(new FieldSet(3, 1), new MockPartitioner());
+			
+			RequestedGlobalProperties pushedDown = req.filterByNodesConstantSet(getAllPreservingNode(), 0);
+			assertTrue(pushedDown == null || pushedDown.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testForcedReblancingNotPushedDown() {
+		try {
+			RequestedGlobalProperties req = new RequestedGlobalProperties();
+			req.setForceRebalancing();
+			
+			RequestedGlobalProperties pushedDown = req.filterByNodesConstantSet(getAllPreservingNode(), 0);
+			assertTrue(pushedDown == null || pushedDown.isTrivial());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static OptimizerNode getAllPreservingNode() {
+		OptimizerNode node = mock(OptimizerNode.class);
+		when(node.isFieldConstant(Matchers.anyInt(), Matchers.anyInt())).thenReturn(true);
+		return node;
+	}
+	
+	private static OptimizerNode getNonePreservingNode() {
+		OptimizerNode node = mock(OptimizerNode.class);
+		when(node.isFieldConstant(Matchers.anyInt(), Matchers.anyInt())).thenReturn(false);
+		return node;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
new file mode 100644
index 0000000..71e4c3a
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import org.apache.flink.api.common.functions.Partitioner;
+
+class MockPartitioner implements Partitioner<Long> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public int partition(Long key, int numPartitions) {
+		return 0;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/DistinctAndGroupingOptimizerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/DistinctAndGroupingOptimizerTest.java
new file mode 100644
index 0000000..45b389a
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/DistinctAndGroupingOptimizerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+@SuppressWarnings("serial")
+public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
+	
+	@Test
+	public void testDistinctPreservesPartitioningOfDistinctFields() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(4);
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+					.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+			
+			data.distinct(0)
+				.groupBy(0)
+				.sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode distinctReducer = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			
+			// reducer can be forward, reuses partitioning from distinct
+			assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+			
+			// distinct reducer is partitioned
+			assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDistinctDestroysPartitioningOfNonDistinctFields() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(4);
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+					.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+			
+			data.distinct(1)
+				.groupBy(0)
+				.sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			
+			// reducer must repartition, because it works on a different field
+			assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+			
+			// distinct reducer is partitioned
+			assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyReducer.java
new file mode 100644
index 0000000..a536bfd
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyReducer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+
+public class DummyReducer<T> extends RichReduceFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T reduce(T a, T b) {
+		return a;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityPartitionerMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityPartitionerMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityPartitionerMapper.java
new file mode 100644
index 0000000..d5c6cfe
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityPartitionerMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityPartitionerMapper<T> extends RichMapPartitionFunction<T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void mapPartition(Iterable<T> values, Collector<T> out) {
+		for (T in : values) {
+			out.collect(in);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
new file mode 100644
index 0000000..f686e94
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+/**
+ * Function to implement a custom partition assignment for keys.
+ * 
+ * @param <K> The type of the key to be partitioned.
+ */
+public interface Partitioner<K> extends java.io.Serializable {
+
+	/**
+	 * Computes the partition for the given key.
+	 * 
+	 * @param key The key.
+	 * @param numPartitions The number of partitions to partition into.
+	 * @return The partition index.
+	 */
+	int partition(K key, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index f500717..ddfd874 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -50,13 +51,13 @@ import java.util.List;
  */
 public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 
-	/**
-	 * The ordering for the order inside a reduce group.
-	 */
+	/** The ordering for the order inside a reduce group. */
 	private Ordering groupOrder;
 
 	private boolean combinable;
 	
+	private Partitioner<?> customPartitioner;
+	
 	
 	public GroupReduceOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, int[] keyPositions, String name) {
 		super(udf, operatorInfo, keyPositions, name);
@@ -82,7 +83,8 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
 	}
 	
-
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Sets the order of the elements within a reduce group.
 	 * 
@@ -102,8 +104,6 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		return this.groupOrder;
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	
 	/**
 	 * Marks the group reduce operation as combinable. Combinable operations may pre-reduce the
 	 * data before the actual group reduce operations. Combinable user-defined functions
@@ -132,6 +132,23 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		return this.combinable;
 	}
 
+	public void setCustomPartitioner(Partitioner<?> customPartitioner) {
+		if (customPartitioner != null) {
+			int[] keys = getKeyColumns(0);
+			if (keys == null || keys.length == 0) {
+				throw new IllegalArgumentException("Cannot use custom partitioner for a non-grouped GroupReduce (AllGroupReduce)");
+			}
+			if (keys.length > 1) {
+				throw new IllegalArgumentException("Cannot use the key partitioner for composite keys (more than one key field)");
+			}
+		}
+		this.customPartitioner = customPartitioner;
+	}
+	
+	public Partitioner<?> getCustomPartitioner() {
+		return customPartitioner;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index ba71b01..bc0f4a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -98,6 +99,8 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 	
 	private JoinHint joinHint = JoinHint.OPTIMIZER_CHOOSES;
 	
+	private Partitioner<?> partitioner;
+	
 	
 	public JoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
 		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
@@ -123,6 +126,14 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		return joinHint;
 	}
 	
+	public void setCustomPartitioner(Partitioner<?> partitioner) {
+		this.partitioner = partitioner;
+	}
+	
+	public Partitioner<?> getCustomPartitioner() {
+		return partitioner;
+	}
+	
 	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("unchecked")
@@ -143,35 +154,37 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		TypeComparator<IN1> leftComparator;
 		TypeComparator<IN2> rightComparator;
 
-		if (leftInformation instanceof AtomicType){
+		if (leftInformation instanceof AtomicType) {
 			leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true);
 		}
-		else if(leftInformation instanceof CompositeType){
+		else if (leftInformation instanceof CompositeType) {
 			int[] keyPositions = getKeyColumns(0);
 			boolean[] orders = new boolean[keyPositions.length];
 			Arrays.fill(orders, true);
 
 			leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0);
-		}else{
+		}
+		else {
 			throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
 					.getCanonicalName() + " is not supported. Could not generate a comparator.");
 		}
 
-		if(rightInformation instanceof AtomicType){
+		if (rightInformation instanceof AtomicType) {
 			rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true);
-		}else if(rightInformation instanceof CompositeType){
+		}
+		else if (rightInformation instanceof CompositeType) {
 			int[] keyPositions = getKeyColumns(1);
 			boolean[] orders = new boolean[keyPositions.length];
 			Arrays.fill(orders, true);
 
 			rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0);
-		}else{
+		}
+		else {
 			throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
 					.getCanonicalName() + " is not supported. Could not generate a comparator.");
 		}
 
-		TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator,
-				rightComparator);
+		TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
 
 		List<OUT> result = new ArrayList<OUT>();
 		Collector<OUT> collector = mutableObjectSafe ? new CopyingListCollector<OUT>(result, outInformation.createSerializer())
@@ -196,7 +209,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 
 			if (matchingHashes != null) {
 				pairComparator.setReference(left);
-				for (IN2 right : matchingHashes){
+				for (IN2 right : matchingHashes) {
 					if (pairComparator.equalToReference(right)) {
 						if (mutableObjectSafe) {
 							function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index af8a111..ee3b259 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
@@ -32,8 +33,20 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
  */
 public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpFunction> {
 	
+	public static enum PartitionMethod {
+		REBALANCE,
+		HASH,
+		RANGE,
+		CUSTOM;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
 	private final PartitionMethod partitionMethod;
 	
+	private Partitioner<?> customPartitioner;
+	
+	
 	public PartitionOperatorBase(UnaryOperatorInformation<IN, IN> operatorInfo, PartitionMethod pMethod, int[] keys, String name) {
 		super(new UserCodeObjectWrapper<NoOpFunction>(new NoOpFunction()), operatorInfo, keys, name);
 		this.partitionMethod = pMethod;
@@ -44,16 +57,31 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF
 		this.partitionMethod = pMethod;
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	
 	public PartitionMethod getPartitionMethod() {
 		return this.partitionMethod;
 	}
 	
-	public static enum PartitionMethod {
-		REBALANCE,
-		HASH,
-		RANGE;
+	public Partitioner<?> getCustomPartitioner() {
+		return customPartitioner;
+	}
+	
+	public void setCustomPartitioner(Partitioner<?> customPartitioner) {
+		if (customPartitioner != null) {
+			int[] keys = getKeyColumns(0);
+			if (keys == null || keys.length == 0) {
+				throw new IllegalArgumentException("Cannot use custom partitioner for a non-grouped GroupReduce (AllGroupReduce)");
+			}
+			if (keys.length > 1) {
+				throw new IllegalArgumentException("Cannot use the key partitioner for composite keys (more than one key field)");
+			}
+		}
+		this.customPartitioner = customPartitioner;
 	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	@Override
 	protected List<IN> executeOnCollections(List<IN> inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
 		return inputData;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 30ff176..f1bf0e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -50,6 +51,9 @@ import java.util.Map;
  */
 public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleInputOperator<T, T, FT> {
 
+	private Partitioner<?> customPartitioner;
+	
+	
 	/**
 	 * Creates a grouped reduce data flow operator.
 	 * 
@@ -124,7 +128,26 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 	}
 
 	// --------------------------------------------------------------------------------------------
+	
+	public void setCustomPartitioner(Partitioner<?> customPartitioner) {
+		if (customPartitioner != null) {
+			int[] keys = getKeyColumns(0);
+			if (keys == null || keys.length == 0) {
+				throw new IllegalArgumentException("Cannot use custom partitioner for a non-grouped GroupReduce (AllGroupReduce)");
+			}
+			if (keys.length > 1) {
+				throw new IllegalArgumentException("Cannot use the key partitioner for composite keys (more than one key field)");
+			}
+		}
+		this.customPartitioner = customPartitioner;
+	}
+	
+	public Partitioner<?> getCustomPartitioner() {
+		return customPartitioner;
+	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	@Override
 	protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
 		// make sure we can handle empty inputs

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
index f98f6e0..f98a05e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
@@ -294,7 +294,7 @@ public abstract class TypeComparator<T> implements Serializable {
 	public abstract int extractKeys(Object record, Object[] target, int index);
 
 	/**
-	 * Get the field comparators. This is used together with {@link #extractKeys(Object)} to provide
+	 * Get the field comparators. This is used together with {@link #extractKeys(Object, Object[], int)} to provide
 	 * interoperability between different record types.
 	 */
 	@SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 0ab8e72..feb2223 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 1a742b6..fd23d40 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index dadd1ca..50c6b98 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java
index 24783ac..39a3301 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldListTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.util;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java
index 7549f43..27d8bcc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/FieldSetTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.util;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index c78cc7a..6b768b7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -153,8 +154,8 @@ public abstract class DataSet<T> {
 
 
 
-    /**
-     * Applies a Map-style operation to the entire partition of the data.
+	/**
+	 * Applies a Map-style operation to the entire partition of the data.
 	 * The function is called once per parallel partition of the data,
 	 * and the entire partition is available through the given Iterator.
 	 * The number of elements that each instance of the MapPartition function
@@ -165,12 +166,12 @@ public abstract class DataSet<T> {
 	 * the use of {@code map()} and {@code flatMap()} is preferable.
 	 *
 	 * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
-     * @return A MapPartitionOperator that represents the transformed DataSet.
-     *
-     * @see MapPartitionFunction
-     * @see MapPartitionOperator
-     * @see DataSet
-     */
+	 * @return A MapPartitionOperator that represents the transformed DataSet.
+	 *
+	 * @see MapPartitionFunction
+	 * @see MapPartitionOperator
+	 * @see DataSet
+	 */
 	public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition ){
 		if (mapPartition == null) {
 			throw new NullPointerException("MapPartition function must not be null.");
@@ -344,7 +345,7 @@ public abstract class DataSet<T> {
 		return new GroupReduceOperator<T, R>(this, resultType, reducer, Utils.getCallLocationName());
 	}
 
-/**
+	/**
 	 * Applies a special case of a reduce transformation (minBy) on a non-grouped {@link DataSet}.<br/>
 	 * The transformation consecutively calls a {@link ReduceFunction} 
 	 * until only a single element remains which is the result of the transformation.
@@ -926,12 +927,59 @@ public abstract class DataSet<T> {
 	}
 	
 	/**
-	 * Enforces a rebalancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the 
+	 * Partitions a tuple DataSet on the specified key fields using a custom partitioner.
+	 * This method takes the key position to partition on, and a partitioner that accepts the key type.
+	 * <p> 
+	 * Note: This method works only on single field keys.
+	 * 
+	 * @param partitioner The partitioner to assign partitions to keys.
+	 * @param field The field index on which the DataSet is to partitioned.
+	 * @return The partitioned DataSet.
+	 */
+	public <K> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, int field) {
+		return new PartitionOperator<T>(this, new Keys.ExpressionKeys<T>(new int[] {field}, getType(), false), partitioner, Utils.getCallLocationName());
+	}
+	
+	/**
+	 * Partitions a POJO DataSet on the specified key fields using a custom partitioner.
+	 * This method takes the key expression to partition on, and a partitioner that accepts the key type.
+	 * <p>
+	 * Note: This method works only on single field keys.
+	 * 
+	 * @param partitioner The partitioner to assign partitions to keys.
+	 * @param field The field index on which the DataSet is to partitioned.
+	 * @return The partitioned DataSet.
+	 */
+	public <K> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, String field) {
+		return new PartitionOperator<T>(this, new Keys.ExpressionKeys<T>(new String[] {field}, getType()), partitioner, Utils.getCallLocationName());
+	}
+	
+	/**
+	 * Partitions a DataSet on the key returned by the selector, using a custom partitioner.
+	 * This method takes the key selector t get the key to partition on, and a partitioner that
+	 * accepts the key type.
+	 * <p>
+	 * Note: This method works only on single field keys, i.e. the selector cannot return tuples
+	 * of fields.
+	 * 
+	 * @param partitioner The partitioner to assign partitions to keys.
+	 * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned.
+	 * @return The partitioned DataSet.
+	 * 
+	 * @see KeySelector
+	 */
+	public <K extends Comparable<K>> PartitionOperator<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keyExtractor) {
+		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+		return new PartitionOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType), partitioner, Utils.getCallLocationName());
+	}
+	
+	/**
+	 * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the 
 	 * following task. This can help to improve performance in case of heavy data skew and compute intensive operations.
 	 * <p>
 	 * <b>Important:</b>This operation shuffles the whole DataSet over the network and can take significant amount of time.
 	 * 
-	 * @return The rebalanced DataSet.
+	 * @return The re-balanced DataSet.
 	 */
 	public PartitionOperator<T> rebalance() {
 		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE, Utils.getCallLocationName());


Mime
View raw message