flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:06:41 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..9f2c467
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+
+public class RequestedGlobalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test(expected = NullPointerException.class)
+	public void testNullProps() {
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+		rgProps.filterBySemanticProperties(null, 0);
+	}
+
+	@Test
+	public void testEraseAll1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testEraseAll2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"3;4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testHashPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(0));
+		assertTrue(filtered.getPartitionedFields().contains(3));
+		assertTrue(filtered.getPartitionedFields().contains(4));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testHashPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(1));
+		assertTrue(filtered.getPartitionedFields().contains(2));
+		assertTrue(filtered.getPartitionedFields().contains(7));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testHashPartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testAnyPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(0));
+		assertTrue(filtered.getPartitionedFields().contains(3));
+		assertTrue(filtered.getPartitionedFields().contains(4));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testAnyPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(1));
+		assertTrue(filtered.getPartitionedFields().contains(2));
+		assertTrue(filtered.getPartitionedFields().contains(7));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testAnyPartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testRangePartitioningPreserved1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(3, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getPartitionedFields());
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningPreserved2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getPartitionedFields());
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningPreserved3() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+		DataDistribution dd = new MockDistribution();
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o, dd);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNotNull(filtered.getDataDistribution());
+		assertEquals(dd, filtered.getDataDistribution());
+		assertNull(filtered.getPartitionedFields());
+		assertNull(filtered.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testCustomPartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new MockPartitioner());
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testRandomDistributionErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRandomPartitioning();
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testReplicationErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setFullyReplicated();
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testRebalancingErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setForceRebalancing();
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testDualHashPartitioningPreserved() {
+
+		DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"},
+				null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties gprops1 = new RequestedGlobalProperties();
+		RequestedGlobalProperties gprops2 = new RequestedGlobalProperties();
+		gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
+		gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+		RequestedGlobalProperties filtered1 = gprops1.filterBySemanticProperties(dprops, 0);
+		RequestedGlobalProperties filtered2 = gprops2.filterBySemanticProperties(dprops, 1);
+
+		assertNotNull(filtered1);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered1.getPartitioning());
+		assertNotNull(filtered1.getPartitionedFields());
+		assertEquals(3, filtered1.getPartitionedFields().size());
+		assertTrue(filtered1.getPartitionedFields().contains(0));
+		assertTrue(filtered1.getPartitionedFields().contains(2));
+		assertTrue(filtered1.getPartitionedFields().contains(4));
+		assertNull(filtered1.getOrdering());
+		assertNull(filtered1.getCustomPartitioner());
+		assertNull(filtered1.getDataDistribution());
+
+		assertNotNull(filtered2);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered2.getPartitioning());
+		assertNotNull(filtered2.getPartitionedFields());
+		assertEquals(3, filtered2.getPartitionedFields().size());
+		assertTrue(filtered2.getPartitionedFields().contains(1));
+		assertTrue(filtered2.getPartitionedFields().contains(3));
+		assertTrue(filtered2.getPartitionedFields().contains(4));
+		assertNull(filtered2.getOrdering());
+		assertNull(filtered2.getCustomPartitioner());
+		assertNull(filtered2.getDataDistribution());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties gprops = new RequestedGlobalProperties();
+		gprops.setHashPartitioned(new FieldSet(0,1));
+
+		gprops.filterBySemanticProperties(sprops, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..0cede0e
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+public class RequestedLocalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test(expected = NullPointerException.class)
+	public void testNullProps() {
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		rlProp.filterBySemanticProperties(null, 0);
+	}
+
+	@Test
+	public void testAllErased() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testGroupingPreserved1() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(0));
+		assertTrue(filtered.getGroupedFields().contains(2));
+		assertTrue(filtered.getGroupedFields().contains(3));
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testGroupingPreserved2() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(3));
+		assertTrue(filtered.getGroupedFields().contains(5));
+		assertTrue(filtered.getGroupedFields().contains(1));
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testGroupingErased() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testOrderPreserved1() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setOrdering(o);
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(4, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getGroupedFields());
+	}
+
+	@Test
+	public void testOrderPreserved2() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setOrdering(o);
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(0, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(5, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getGroupedFields());
+	}
+
+	@Test
+	public void testOrderErased() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setOrdering(o);
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testDualGroupingPreserved() {
+
+		DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"1->0;3;2->4"}, new String[]{"0->7;1"},
+				null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties lprops1 = new RequestedLocalProperties();
+		lprops1.setGroupedFields(new FieldSet(0,3,4));
+
+		RequestedLocalProperties lprops2 = new RequestedLocalProperties();
+		lprops2.setGroupedFields(new FieldSet(7, 1));
+
+		RequestedLocalProperties filtered1 = lprops1.filterBySemanticProperties(dprops, 0);
+		RequestedLocalProperties filtered2 = lprops2.filterBySemanticProperties(dprops, 1);
+
+		assertNotNull(filtered1);
+		assertNotNull(filtered1.getGroupedFields());
+		assertEquals(3, filtered1.getGroupedFields().size());
+		assertTrue(filtered1.getGroupedFields().contains(1));
+		assertTrue(filtered1.getGroupedFields().contains(2));
+		assertTrue(filtered1.getGroupedFields().contains(3));
+		assertNull(filtered1.getOrdering());
+
+		assertNotNull(filtered2);
+		assertNotNull(filtered2.getGroupedFields());
+		assertEquals(2, filtered2.getGroupedFields().size());
+		assertTrue(filtered2.getGroupedFields().contains(0));
+		assertTrue(filtered2.getGroupedFields().contains(1));
+		assertNull(filtered2.getOrdering());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(1, 4));
+
+		rlProp.filterBySemanticProperties(sProps, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
new file mode 100644
index 0000000..b359e6b
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerTestBase;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class DeltaIterationDependenciesTest extends CompilerTestBase {
+
+	@Test
+	public void testExceptionWhenNewWorksetNotDependentOnWorkset() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIteration = input.iterateDelta(input, 10,0);
+
+			DataSet<Tuple2<Long, Long>> delta = deltaIteration.getSolutionSet().join(deltaIteration.getWorkset())
+														.where(0).equalTo(0)
+														.projectFirst(1).projectSecond(1);
+
+			DataSet<Tuple2<Long, Long>> nextWorkset = deltaIteration.getSolutionSet().join(input)
+														.where(0).equalTo(0)
+														.projectFirst(1).projectSecond(1);
+			
+
+			DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
+
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			try {
+				compileNoStats(p);
+				fail("Should not be able to compile, since the next workset does not depend on the workset");
+			}
+			catch (CompilerException e) {
+				// good
+			}
+			catch (Exception e) {
+				fail("wrong exception type");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
new file mode 100644
index 0000000..de02836
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+@SuppressWarnings("serial")
+public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
+	
+	@Test
+	public void testDistinctPreservesPartitioningOfDistinctFields() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(4);
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+					.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+			
+			data.distinct(0)
+				.groupBy(0)
+				.sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode distinctReducer = (SingleInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			
+			// reducer can be forward, reuses partitioning from distinct
+			assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+			
+			// distinct reducer is partitioned
+			assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDistinctDestroysPartitioningOfNonDistinctFields() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(4);
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+					.map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+			
+			data.distinct(1)
+				.groupBy(0)
+				.sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+			SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			
+			// reducer must repartition, because it works on a different field
+			assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+			
+			// distinct reducer is partitioned
+			assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
new file mode 100644
index 0000000..a683968
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class GroupReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+	@Test
+	public void testAllGroupReduceNoCombiner() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
+			
+			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
+				public void reduce(Iterable<Double> values, Collector<Double> out) {}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			
+			// the all-reduce has no combiner, when the DOP of the input is one
+			
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// check wiring
+			assertEquals(sourceNode, reduceNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that reduce has the right strategy
+			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			
+			// check DOP
+			assertEquals(1, sourceNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testAllReduceWithCombiner() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
+			
+			GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
+				public void reduce(Iterable<Long> values, Collector<Long> out) {}
+			}).name("reducer");
+			
+			reduced.setCombinable(true);
+			reduced.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
+			
+			// check DOP
+			assertEquals(8, sourceNode.getParallelism());
+			assertEquals(8, combineNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	
+	@Test
+	public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			data
+				.groupBy(1)
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// check wiring
+			assertEquals(sourceNode, reduceNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithFieldPositionKeyCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+					.groupBy(1)
+					.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer");
+			
+			reduced.setCombinable(true);
+			reduced.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(1));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			data
+				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
+					public String getKey(Tuple2<String, Double> value) { return value.f0; }
+				})
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
+					public String getKey(Tuple2<String, Double> value) { return value.f0; }
+				})
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+			}).name("reducer");
+			
+			reduced.setCombinable(true);
+			reduced.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(1));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
new file mode 100644
index 0000000..37a8e81
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class IterationCompilerTest extends CompilerTestBase {
+
+	@Test
+	public void testIdentityIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(43);
+			
+			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
+			iteration.closeWith(iteration).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testEmptyWorksetIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(43);
+			
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value){ return null; }
+					});
+					
+					
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
+			iter.closeWith(iter.getWorkset(), iter.getWorkset())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIterationWithUnionRoot() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(43);
+			
+			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
+			
+			iteration.closeWith(
+					iteration.map(new IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>())))
+					.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			BulkIterationPlanNode iterNode = (BulkIterationPlanNode) sink.getInput().getSource();
+			
+			// make sure that the root is part of the dynamic path
+			
+			// the "NoOp" that comes after the union.
+			SingleInputPlanNode noop = (SingleInputPlanNode) iterNode.getRootOfStepFunction();
+			NAryUnionPlanNode union = (NAryUnionPlanNode) noop.getInput().getSource();
+			
+			assertTrue(noop.isOnDynamicPath());
+			assertTrue(noop.getCostWeight() >= 1);
+			
+			assertTrue(union.isOnDynamicPath());
+			assertTrue(union.getCostWeight() >= 1);
+			
+			// see that the jobgraph generator can translate this
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWorksetIterationWithUnionRoot() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(43);
+			
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value){ return null; }
+					});
+					
+					
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
+			iter.closeWith(
+					iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>())
+				.union(
+					iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
+				, iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>())
+				.union(
+						iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
+				)
+			.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) sink.getInput().getSource();
+			
+			// make sure that the root is part of the dynamic path
+			
+			// the "NoOp"a that come after the union.
+			SingleInputPlanNode nextWorksetNoop = (SingleInputPlanNode) iterNode.getNextWorkSetPlanNode();
+			SingleInputPlanNode solutionDeltaNoop = (SingleInputPlanNode) iterNode.getSolutionSetDeltaPlanNode();
+			
+			NAryUnionPlanNode nextWorksetUnion = (NAryUnionPlanNode) nextWorksetNoop.getInput().getSource();
+			NAryUnionPlanNode solutionDeltaUnion = (NAryUnionPlanNode) solutionDeltaNoop.getInput().getSource();
+			
+			assertTrue(nextWorksetNoop.isOnDynamicPath());
+			assertTrue(nextWorksetNoop.getCostWeight() >= 1);
+			
+			assertTrue(solutionDeltaNoop.isOnDynamicPath());
+			assertTrue(solutionDeltaNoop.getCostWeight() >= 1);
+			
+			assertTrue(nextWorksetUnion.isOnDynamicPath());
+			assertTrue(nextWorksetUnion.getCostWeight() >= 1);
+			
+			assertTrue(solutionDeltaUnion.isOnDynamicPath());
+			assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
+			
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
new file mode 100644
index 0000000..0a62132
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinTranslationTest extends CompilerTestBase {
+
+	@Test
+	public void testBroadcastHashFirstTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_FIRST);
+			assertEquals(ShipStrategyType.BROADCAST, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBroadcastHashSecondTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_SECOND);
+			assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.BROADCAST, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionHashFirstTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_FIRST);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionHashSecondTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_SECOND);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionSortMergeTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertEquals(DriverStrategy.MERGE, node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testOptimizerChoosesTest() {
+		try {
+			DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.OPTIMIZER_CHOOSES);
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+			assertTrue(DriverStrategy.HYBRIDHASH_BUILD_FIRST == node.getDriverStrategy() ||
+					DriverStrategy.HYBRIDHASH_BUILD_SECOND == node.getDriverStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+		}
+	}
+	
+	
+	private DualInputPlanNode createPlanAndGetJoinNode(JoinHint hint) {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> i1 = env.generateSequence(1, 1000);
+		DataSet<Long> i2 = env.generateSequence(1, 1000);
+		
+		i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+		
+		Plan plan = env.createProgramPlan();
+		
+		// set statistics to the sources
+		plan.accept(new Visitor<Operator<?>>() {
+			@Override
+			public boolean preVisit(Operator<?> visitable) {
+				if (visitable instanceof GenericDataSourceBase) {
+					GenericDataSourceBase<?, ?> source = (GenericDataSourceBase<?, ?>) visitable;
+					setSourceStatistics(source, 10000000, 1000);
+				}
+				
+				return true;
+			}
+			
+			@Override
+			public void postVisit(Operator<?> visitable) {}
+		});
+		
+		OptimizedPlan op = compileWithStats(plan);
+		
+		return (DualInputPlanNode) ((SinkPlanNode) op.getDataSinks().iterator().next()).getInput().getSource();
+	}
+	
+	
+	
+	private static final class IdentityKeySelector<T> implements KeySelector<T, T> {
+		
+		@Override
+		public T getKey(T value) {
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
new file mode 100644
index 0000000..cd63b72
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class OpenIterationTest extends CompilerTestBase {
+
+	@Test
+	public void testSinkInOpenBulkIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> input = env.generateSequence(1, 10);
+			
+			IterativeDataSet<Long> iteration = input.iterate(10);
+			
+			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
+			
+			mapped.print();
+			
+			try {
+				env.createProgramPlan();
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSinkInClosedBulkIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> input = env.generateSequence(1, 10);
+			
+			IterativeDataSet<Long> iteration = input.iterate(10);
+			
+			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
+			
+			iteration.closeWith(mapped).print();
+			
+			mapped.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			try {
+				compileNoStats(p);
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSinkOnSolutionSetDeltaIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0);
+			
+			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			mapped.print();
+			
+			try {
+				env.createProgramPlan();
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSinkOnWorksetDeltaIteration() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0);
+			
+			DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			mapped.print();
+			
+			try {
+				env.createProgramPlan();
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testOperationOnSolutionSet() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+			
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0);
+			
+			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
+			
+			DataSet<Tuple2<Long, Long>> joined = iteration.getWorkset().join(mapped)
+												.where(0).equalTo(0).projectFirst(1).projectSecond(0);
+			
+			iteration.closeWith(joined, joined)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			try {
+				compileNoStats(p);
+				fail("should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
new file mode 100644
index 0000000..8bb9a76
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionOperatorTest extends CompilerTestBase {
+
+	@Test
+	public void testPartitionOperatorPreservesFields() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<Long, Long>(0L, 0L)));
+			
+			data.partitionCustom(new Partitioner<Long>() {
+					public int partition(Long key, int numPartitions) { return key.intValue(); }
+				}, 1)
+				.groupBy(1)
+				.reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
+
+			assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
new file mode 100644
index 0000000..0724a9f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+	@Test
+	public void testAllReduceNoCombiner() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
+			
+			data.reduce(new RichReduceFunction<Double>() {
+				
+				@Override
+				public Double reduce(Double value1, Double value2){
+					return value1 + value2;
+				}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			
+			// the all-reduce has no combiner, when the DOP of the input is one
+			
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// check wiring
+			assertEquals(sourceNode, reduceNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check DOP
+			assertEquals(1, sourceNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testAllReduceWithCombiner() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
+			
+			data.reduce(new RichReduceFunction<Long>() {
+				
+				@Override
+				public Long reduce(Long value1, Long value2){
+					return value1 + value2;
+				}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
+			
+			// check DOP
+			assertEquals(8, sourceNode.getParallelism());
+			assertEquals(8, combineNode.getParallelism());
+			assertEquals(1, reduceNode.getParallelism());
+			assertEquals(1, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithFieldPositionKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			data
+				.groupBy(1)
+				.reduce(new RichReduceFunction<Tuple2<String,Double>>() {
+				@Override
+				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
+					return null;
+				}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGroupedReduceWithSelectorFunctionKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+				.name("source").setParallelism(6);
+			
+			data
+				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
+					public String getKey(Tuple2<String, Double> value) { return value.f0; }
+				})
+				.reduce(new RichReduceFunction<Tuple2<String,Double>>() {
+				@Override
+				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
+					return null;
+				}
+			}).name("reducer")
+			.print().name("sink");
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+			
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+			
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+			
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+			
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+			
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
+			
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+			
+			// check DOP
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, keyExtractor.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, keyProjector.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+}


Mime
View raw message