flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [09/13] flink git commit: [FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic annotations for functions. - Renamed constantField annotations to forwardedFields annotation - Forwarded fields can be defined for (nested) tuples, Po
Date Wed, 28 Jan 2015 01:24:09 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..397dd3d
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/LocalPropertiesFilteringTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class LocalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test
+	public void testAllErased1() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testAllErased2() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"5"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 1, 2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testGroupingPreserved1() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(0));
+		assertTrue(filtered.getGroupedFields().contains(2));
+		assertTrue(filtered.getGroupedFields().contains(3));
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testGroupingPreserved2() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0;3->7"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(4));
+		assertTrue(filtered.getGroupedFields().contains(0));
+		assertTrue(filtered.getGroupedFields().contains(7));
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testGroupingErased() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->4;2->0"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(0, 2, 3));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved1() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(3, gFields.size());
+		assertTrue(gFields.contains(0));
+		assertTrue(gFields.contains(2));
+		assertTrue(gFields.contains(5));
+		assertNotNull(order);
+		assertEquals(3, order.getNumberOfFields());
+		assertEquals(2, order.getFieldNumber(0).intValue());
+		assertEquals(0, order.getFieldNumber(1).intValue());
+		assertEquals(5, order.getFieldNumber(2).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(Order.DESCENDING, order.getOrder(1));
+		assertEquals(Order.DESCENDING, order.getOrder(2));
+		assertEquals(IntValue.class, order.getType(0));
+		assertEquals(StringValue.class, order.getType(1));
+		assertEquals(LongValue.class, order.getType(2));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved2() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->3;2->7;5->1"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(3, gFields.size());
+		assertTrue(gFields.contains(3));
+		assertTrue(gFields.contains(7));
+		assertTrue(gFields.contains(1));
+		assertNotNull(order);
+		assertEquals(3, order.getNumberOfFields());
+		assertEquals(7, order.getFieldNumber(0).intValue());
+		assertEquals(3, order.getFieldNumber(1).intValue());
+		assertEquals(1, order.getFieldNumber(2).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(Order.DESCENDING, order.getOrder(1));
+		assertEquals(Order.DESCENDING, order.getOrder(2));
+		assertEquals(IntValue.class, order.getType(0));
+		assertEquals(StringValue.class, order.getType(1));
+		assertEquals(LongValue.class, order.getType(2));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved3() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(2, gFields.size());
+		assertTrue(gFields.contains(0));
+		assertTrue(gFields.contains(2));
+		assertNotNull(order);
+		assertEquals(2, order.getNumberOfFields());
+		assertEquals(2, order.getFieldNumber(0).intValue());
+		assertEquals(0, order.getFieldNumber(1).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(Order.DESCENDING, order.getOrder(1));
+		assertEquals(IntValue.class, order.getType(0));
+		assertEquals(StringValue.class, order.getType(1));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingPreserved4() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"2->7;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNotNull(gFields);
+		assertEquals(1, gFields.size());
+		assertTrue(gFields.contains(7));
+		assertNotNull(order);
+		assertEquals(1, order.getNumberOfFields());
+		assertEquals(7, order.getFieldNumber(0).intValue());
+		assertEquals(Order.ASCENDING, order.getOrder(0));
+		assertEquals(IntValue.class, order.getType(0));
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testSortingErased() {
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;5"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(2, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(0, StringValue.class, Order.DESCENDING);
+		o.appendOrdering(5, LongValue.class, Order.DESCENDING);
+		LocalProperties lProps = LocalProperties.forOrdering(o);
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldList gFields = filtered.getGroupedFields();
+		Ordering order = filtered.getOrdering();
+
+		assertNull(gFields);
+		assertNull(order);
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test
+	public void testUniqueFieldsPreserved1() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = new LocalProperties();
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldSet expected1 = new FieldSet(0,1,2);
+		FieldSet expected2 = new FieldSet(3,4);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNotNull(filtered.getUniqueFields());
+		assertEquals(2, filtered.getUniqueFields().size());
+		assertTrue(filtered.getUniqueFields().contains(expected1));
+		assertTrue(filtered.getUniqueFields().contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldsPreserved2() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;2;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = LocalProperties.forGrouping(new FieldList(1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldSet expected1 = new FieldSet(0,1,2);
+		FieldSet expected2 = new FieldSet(3,4);
+
+		assertNull(filtered.getOrdering());
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(2, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(1));
+		assertTrue(filtered.getGroupedFields().contains(2));
+		assertNotNull(filtered.getUniqueFields());
+		assertEquals(2, filtered.getUniqueFields().size());
+		assertTrue(filtered.getUniqueFields().contains(expected1));
+		assertTrue(filtered.getUniqueFields().contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldsPreserved3() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0->7;1->6;2->5;3->4;4->3"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = new LocalProperties();
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+		FieldSet expected1 = new FieldSet(5,6,7);
+		FieldSet expected2 = new FieldSet(3,4);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNotNull(filtered.getUniqueFields());
+		assertEquals(2, filtered.getUniqueFields().size());
+		assertTrue(filtered.getUniqueFields().contains(expected1));
+		assertTrue(filtered.getUniqueFields().contains(expected2));
+	}
+
+	@Test
+	public void testUniqueFieldsErased() {
+
+		SingleInputSemanticProperties sp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sp, new String[]{"0;1;4"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lProps = new LocalProperties();
+		lProps = lProps.addUniqueFields(new FieldSet(0,1,2));
+		lProps = lProps.addUniqueFields(new FieldSet(3,4));
+		lProps = lProps.addUniqueFields(new FieldSet(4,5,6));
+
+		LocalProperties filtered = lProps.filterBySemanticProperties(sp, 0);
+
+		assertNull(filtered.getGroupedFields());
+		assertNull(filtered.getOrdering());
+		assertNull(filtered.getUniqueFields());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		LocalProperties lprops = LocalProperties.forGrouping(new FieldList(0,1));
+
+		lprops.filterBySemanticProperties(sprops, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
new file mode 100644
index 0000000..2f438cf
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockDistribution.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+
+import java.io.IOException;
+
+public class MockDistribution implements DataDistribution {
+
+	@Override
+	public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+		return new Key<?>[0];
+	}
+
+	@Override
+	public int getNumberOfFields() {
+		return 0;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
index 71e4c3a..1245398 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/MockPartitioner.java
@@ -19,13 +19,14 @@
 package org.apache.flink.compiler.dataproperties;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.tuple.Tuple2;
 
-class MockPartitioner implements Partitioner<Long> {
+class MockPartitioner implements Partitioner<Tuple2<Long, Integer>> {
 	
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public int partition(Long key, int numPartitions) {
+	public int partition(Tuple2<Long, Integer> key, int numPartitions) {
 		return 0;
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..f9acabb
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class RequestedGlobalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test(expected = NullPointerException.class)
+	public void testNullProps() {
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(null, 0);
+	}
+
+	@Test
+	public void testEraseAll1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testEraseAll2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"3;4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testHashPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(0));
+		assertTrue(filtered.getPartitionedFields().contains(3));
+		assertTrue(filtered.getPartitionedFields().contains(4));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testHashPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(1));
+		assertTrue(filtered.getPartitionedFields().contains(2));
+		assertTrue(filtered.getPartitionedFields().contains(7));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testHashPartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testAnyPartitioningPreserved1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(0));
+		assertTrue(filtered.getPartitionedFields().contains(3));
+		assertTrue(filtered.getPartitionedFields().contains(4));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testAnyPartitioningPreserved2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
+		assertNotNull(filtered.getPartitionedFields());
+		assertEquals(3, filtered.getPartitionedFields().size());
+		assertTrue(filtered.getPartitionedFields().contains(1));
+		assertTrue(filtered.getPartitionedFields().contains(2));
+		assertTrue(filtered.getPartitionedFields().contains(7));
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testAnyPartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testRangePartitioningPreserved1() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(3, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getPartitionedFields());
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningPreserved2() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getPartitionedFields());
+		assertNull(filtered.getDataDistribution());
+		assertNull(filtered.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningPreserved3() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+		DataDistribution dd = new MockDistribution();
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o, dd);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNotNull(filtered);
+		assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNotNull(filtered.getDataDistribution());
+		assertEquals(dd, filtered.getDataDistribution());
+		assertNull(filtered.getPartitionedFields());
+		assertNull(filtered.getCustomPartitioner());
+	}
+
+	@Test
+	public void testRangePartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRangePartitioned(o);
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testCustomPartitioningErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new MockPartitioner());
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testRandomDistributionErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setRandomDistribution();
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testReplicationErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setFullyReplicated();
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testRebalancingErased() {
+
+		SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+		rgProps.setForceRebalancing();
+
+		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testDualHashPartitioningPreserved() {
+
+		DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"},
+				null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties gprops1 = new RequestedGlobalProperties();
+		RequestedGlobalProperties gprops2 = new RequestedGlobalProperties();
+		gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
+		gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+		RequestedGlobalProperties filtered1 = gprops1.filterBySemanticProperties(dprops, 0);
+		RequestedGlobalProperties filtered2 = gprops2.filterBySemanticProperties(dprops, 1);
+
+		assertNotNull(filtered1);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered1.getPartitioning());
+		assertNotNull(filtered1.getPartitionedFields());
+		assertEquals(3, filtered1.getPartitionedFields().size());
+		assertTrue(filtered1.getPartitionedFields().contains(0));
+		assertTrue(filtered1.getPartitionedFields().contains(2));
+		assertTrue(filtered1.getPartitionedFields().contains(4));
+		assertNull(filtered1.getOrdering());
+		assertNull(filtered1.getCustomPartitioner());
+		assertNull(filtered1.getDataDistribution());
+
+		assertNotNull(filtered2);
+		assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered2.getPartitioning());
+		assertNotNull(filtered2.getPartitionedFields());
+		assertEquals(3, filtered2.getPartitionedFields().size());
+		assertTrue(filtered2.getPartitionedFields().contains(1));
+		assertTrue(filtered2.getPartitionedFields().contains(3));
+		assertTrue(filtered2.getPartitionedFields().contains(4));
+		assertNull(filtered2.getOrdering());
+		assertNull(filtered2.getCustomPartitioner());
+		assertNull(filtered2.getDataDistribution());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedGlobalProperties gprops = new RequestedGlobalProperties();
+		gprops.setHashPartitioned(new FieldList(0,1));
+
+		gprops.filterBySemanticProperties(sprops, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..f3aaf05
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedLocalPropertiesFilteringTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RequestedLocalPropertiesFilteringTest {
+
+	private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+			new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+			);
+
+	@Test(expected = NullPointerException.class)
+	public void testNullProps() {
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(null, 0);
+	}
+
+	@Test
+	public void testAllErased() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testGroupingPreserved1() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(0));
+		assertTrue(filtered.getGroupedFields().contains(2));
+		assertTrue(filtered.getGroupedFields().contains(3));
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testGroupingPreserved2() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getGroupedFields());
+		assertEquals(3, filtered.getGroupedFields().size());
+		assertTrue(filtered.getGroupedFields().contains(3));
+		assertTrue(filtered.getGroupedFields().contains(5));
+		assertTrue(filtered.getGroupedFields().contains(1));
+		assertNull(filtered.getOrdering());
+	}
+
+	@Test
+	public void testGroupingErased() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testOrderPreserved1() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setOrdering(o);
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(4, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getGroupedFields());
+	}
+
+	@Test
+	public void testOrderPreserved2() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setOrdering(o);
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNotNull(filtered);
+		assertNotNull(filtered.getOrdering());
+		assertEquals(3, filtered.getOrdering().getNumberOfFields());
+		assertEquals(0, filtered.getOrdering().getFieldNumber(0).intValue());
+		assertEquals(5, filtered.getOrdering().getFieldNumber(1).intValue());
+		assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+		assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+		assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+		assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+		assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+		assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+		assertNull(filtered.getGroupedFields());
+	}
+
+	@Test
+	public void testOrderErased() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+		Ordering o = new Ordering();
+		o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+		o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+		o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setOrdering(o);
+
+		RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+		assertNull(filtered);
+	}
+
+	@Test
+	public void testDualGroupingPreserved() {
+
+		DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"1->0;3;2->4"}, new String[]{"0->7;1"},
+				null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties lprops1 = new RequestedLocalProperties();
+		lprops1.setGroupedFields(new FieldSet(0,3,4));
+
+		RequestedLocalProperties lprops2 = new RequestedLocalProperties();
+		lprops2.setGroupedFields(new FieldSet(7, 1));
+
+		RequestedLocalProperties filtered1 = lprops1.filterBySemanticProperties(dprops, 0);
+		RequestedLocalProperties filtered2 = lprops2.filterBySemanticProperties(dprops, 1);
+
+		assertNotNull(filtered1);
+		assertNotNull(filtered1.getGroupedFields());
+		assertEquals(3, filtered1.getGroupedFields().size());
+		assertTrue(filtered1.getGroupedFields().contains(1));
+		assertTrue(filtered1.getGroupedFields().contains(2));
+		assertTrue(filtered1.getGroupedFields().contains(3));
+		assertNull(filtered1.getOrdering());
+
+		assertNotNull(filtered2);
+		assertNotNull(filtered2.getGroupedFields());
+		assertEquals(2, filtered2.getGroupedFields().size());
+		assertTrue(filtered2.getGroupedFields().contains(0));
+		assertTrue(filtered2.getGroupedFields().contains(1));
+		assertNull(filtered2.getOrdering());
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidInputIndex() {
+
+		SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+		SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+		RequestedLocalProperties rlProp = new RequestedLocalProperties();
+		rlProp.setGroupedFields(new FieldSet(1, 4));
+
+		rlProp.filterBySemanticProperties(sProps, 1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
index 4d81e0b..3b9a7a2 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/WorksetIterationsJavaApiCompilerTest.java
@@ -278,19 +278,19 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 				}
 			})
 			.name(JOIN_WITH_SOLUTION_SET)
-			.withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
+			.withForwardedFieldsSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
 			
 		DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
 			.reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
 				public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
 			})
 			.name(NEXT_WORKSET_REDUCER_NAME)
-			.withConstantSet("1->1","2->2","0->0");
+			.withForwardedFields("1->1","2->2","0->0");
 		
 		
 		DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
 				joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
-					.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
+					.name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") :
 				joinedWithSolutionSet;
 		
 		iter.closeWith(nextSolutionSet, nextWorkset)

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
index 6325788..9cdea6d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
@@ -59,7 +59,7 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT extends Function> exte
 	/**
 	 * Semantic properties of the associated function.
 	 */
-	private DualInputSemanticProperties semanticProperties;
+	private DualInputSemanticProperties semanticProperties = new DualInputSemanticProperties();
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
index fe53380..70fd753 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
@@ -27,20 +27,20 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 /**
  * Container for the semantic properties associated to a dual input operator.
  */
-public class DualInputSemanticProperties extends SemanticProperties {
+public class DualInputSemanticProperties implements SemanticProperties {
 	private static final long serialVersionUID = 1L;
-	
+
 	/**
 	 * Mapping from fields in the source record(s) in the first input to fields
 	 * in the destination record(s).  
 	 */
-	private Map<Integer,FieldSet> forwardedFields1;
+	private Map<Integer,FieldSet> fieldMapping1;
 	
 	/**
 	 * Mapping from fields in the source record(s) in the second input to fields
 	 * in the destination record(s).  
 	 */
-	private Map<Integer,FieldSet> forwardedFields2;
+	private Map<Integer,FieldSet> fieldMapping2;
 	
 	/**
 	 * Set of fields that are read in the source record(s) from the
@@ -56,293 +56,124 @@ public class DualInputSemanticProperties extends SemanticProperties {
 
 	
 	public DualInputSemanticProperties() {
-		init();
+		this.fieldMapping1 = new HashMap<Integer,FieldSet>();
+		this.fieldMapping2 = new HashMap<Integer,FieldSet>();
+		this.readFields1 = null;
+		this.readFields2 = null;
 	}
 
-	/**
-	 * Finds the source field where the given field was forwarded from.
-	 * @param dest The destination field in the output data.
-	 * @return FieldSet containing the source input fields.
-	 */
-	public FieldSet forwardedFrom1(int dest) {
-		FieldSet fs = null;
-		for (Map.Entry<Integer, FieldSet> entry : forwardedFields1.entrySet()) {
-			if (entry.getValue().contains(dest)) {
-				if (fs == null) {
-					fs = new FieldSet();
-				}
+	@Override
+	public FieldSet getForwardingTargetFields(int input, int sourceField) {
 
-				fs = fs.addField(entry.getKey());
-			}
+		if (input != 0 && input != 1) {
+			throw new IndexOutOfBoundsException();
+		} else if (input == 0) {
+
+			return fieldMapping1.containsKey(sourceField) ? fieldMapping1.get(sourceField) : FieldSet.EMPTY_SET;
+		} else {
+			return fieldMapping2.containsKey(sourceField) ? fieldMapping2.get(sourceField) : FieldSet.EMPTY_SET;
 		}
-		return fs;
 	}
 
-	public FieldSet forwardedFrom2(int dest) {
-		FieldSet fs = null;
-		for (Map.Entry<Integer, FieldSet> entry : forwardedFields2.entrySet()) {
-			if (entry.getValue().contains(dest)) {
-				if (fs == null) {
-					fs = new FieldSet();
-				}
+	@Override
+	public int getForwardingSourceField(int input, int targetField) {
+		Map<Integer, FieldSet> fieldMapping;
+
+		if (input != 0 && input != 1) {
+			throw new IndexOutOfBoundsException();
+		} else if (input == 0) {
+			fieldMapping = fieldMapping1;
+		} else {
+			fieldMapping = fieldMapping2;
+		}
 
-				fs = fs.addField(entry.getKey());
+		for (Map.Entry<Integer, FieldSet> e : fieldMapping.entrySet()) {
+			if (e.getValue().contains(targetField)) {
+				return e.getKey();
 			}
 		}
-		return fs;
+		return -1;
 	}
 
-	/**
-	 * Adds, to the existing information, a field that is forwarded directly
-	 * from the source record(s) in the first input to the destination
-	 * record(s).
-	 * 
-	 * @param sourceField the position in the source record(s) from the first input
-	 * @param destinationField the position in the destination record(s)
-	 */
-	public void addForwardedField1(int sourceField, int destinationField) {
-		FieldSet old = this.forwardedFields1.get(sourceField);
-		if (old == null) {
-			old = FieldSet.EMPTY_SET;
+	@Override
+	public FieldSet getReadFields(int input) {
+		if (input != 0 && input != 1) {
+			throw new IndexOutOfBoundsException();
 		}
-		
-		FieldSet fs = old.addField(destinationField);
-		this.forwardedFields1.put(sourceField, fs);
-	}
-	
-	/**
-	 * Adds, to the existing information, a field that is forwarded directly
-	 * from the source record(s) in the first input to multiple fields in
-	 * the destination record(s).
-	 * 
-	 * @param sourceField the position in the source record(s)
-	 * @param destinationFields the position in the destination record(s)
-	 */
-	public void addForwardedField1(int sourceField, FieldSet destinationFields) {
-		FieldSet old = this.forwardedFields1.get(sourceField);
-		if (old == null) {
-			old = FieldSet.EMPTY_SET;
+
+		if (input == 0) {
+			return readFields1;
+		} else {
+			return readFields2;
 		}
-		
-		FieldSet fs = old.addFields(destinationFields);
-		this.forwardedFields1.put(sourceField, fs);
 	}
-	
-	/**
-	 * Sets a field that is forwarded directly from the source
-	 * record(s) in the first input to multiple fields in the
-	 * destination record(s).
-	 * 
-	 * @param sourceField the position in the source record(s)
-	 * @param destinationFields the position in the destination record(s)
-	 */
-	public void setForwardedField1(int sourceField, FieldSet destinationFields) {
-		this.forwardedFields1.put(sourceField, destinationFields);
-	}
-	
-	/**
-	 * Gets the fields in the destination record where the source
-	 * field from the first input is forwarded.
-	 * 
-	 * @param sourceField the position in the source record
-	 * @return the destination fields, or null if they do not exist
-	 */
-	public FieldSet getForwardedField1(int sourceField) {
-		if (isAllFieldsConstant()) {
-			return new FieldSet(sourceField);
-		}
 
-		return this.forwardedFields1.get(sourceField);
-	}
-	
 	/**
 	 * Adds, to the existing information, a field that is forwarded directly
-	 * from the source record(s) in the second input to the destination
+	 * from the source record(s) in the first input to the destination
 	 * record(s).
-	 * 
-	 * @param sourceField the position in the source record(s) from the first input
-	 * @param destinationField the position in the destination record(s)
-	 */
-	public void addForwardedField2(int sourceField, int destinationField) {
-		FieldSet old = this.forwardedFields2.get(sourceField);
-		if (old == null) {
-			old = FieldSet.EMPTY_SET;
-		}
-		
-		FieldSet fs = old.addField(destinationField);
-		this.forwardedFields2.put(sourceField, fs);
-	}
-	
-	/**
-	 * Adds, to the existing information, a field that is forwarded directly
-	 * from the source record(s) in the second input to multiple fields in
-	 * the destination record(s).
-	 * 
-	 * @param sourceField the position in the source record(s)
-	 * @param destinationFields the position in the destination record(s)
-	 */
-	public void addForwardedField2(int sourceField, FieldSet destinationFields) {
-		FieldSet old = this.forwardedFields2.get(sourceField);
-		if (old == null) {
-			old = FieldSet.EMPTY_SET;
-		}
-		
-		FieldSet fs = old.addFields(destinationFields);
-		this.forwardedFields2.put(sourceField, fs);
-	}
-	
-	/**
-	 * Sets a field that is forwarded directly from the source
-	 * record(s) in the second input to multiple fields in the
-	 * destination record(s).
-	 * 
-	 * @param sourceField the position in the source record(s)
-	 * @param destinationFields the position in the destination record(s)
-	 */
-	public void setForwardedField2(int sourceField, FieldSet destinationFields) {
-		this.forwardedFields2.put(sourceField, destinationFields);
-	}
-	
-	/**
-	 * Gets the fields in the destination record where the source
-	 * field from the second input is forwarded.
-	 * 
+	 *
+	 * @param input the input of the source field
 	 * @param sourceField the position in the source record
-	 * @return the destination fields, or null if they do not exist
+	 * @param targetField the position in the destination record
 	 */
-	public FieldSet getForwardedField2(int sourceField) {
-		if (isAllFieldsConstant()) {
-			return new FieldSet(sourceField);
-		}
+	public void addForwardedField(int input, int sourceField, int targetField) {
 
-		return this.forwardedFields2.get(sourceField);
-	}
+		Map<Integer, FieldSet> fieldMapping;
 
-	@Override
-	public FieldSet getSourceField(int input, int field) {
-		if (isAllFieldsConstant()) {
-			return new FieldSet(field);
+		if (input != 0 && input != 1) {
+			throw new IndexOutOfBoundsException();
+		} else if (input == 0) {
+			fieldMapping = this.fieldMapping1;
+		} else {
+			fieldMapping = this.fieldMapping2;
 		}
 
-		switch(input) {
-			case 0:
-				return this.forwardedFrom1(field);
-			case 1:
-				return this.forwardedFrom2(field);
-			default:
-				throw new IndexOutOfBoundsException();
+		if(isTargetFieldPresent(targetField, fieldMapping)) {
+			throw new InvalidSemanticAnnotationException("Target field "+targetField+" was added twice to input "+input);
 		}
-	}
 
-	@Override
-	public FieldSet getForwardFields(int input, int field) {
-		if (isAllFieldsConstant()) {
-			return new FieldSet(field);
+		FieldSet targetFields = fieldMapping.get(sourceField);
+		if (targetFields != null) {
+			fieldMapping.put(sourceField, targetFields.addField(targetField));
+		} else {
+			fieldMapping.put(sourceField, new FieldSet(targetField));
 		}
+	}
 
-		if (input == 0) {
-			return this.getForwardedField1(field);
-		} else if (input == 1) {
-			return this.getForwardedField2(field);
+	private boolean isTargetFieldPresent(int targetField, Map<Integer, FieldSet> fieldMapping) {
+
+		for(FieldSet targetFields : fieldMapping.values()) {
+			if(targetFields.contains(targetField)) {
+				return true;
+			}
 		}
-		return null;
+		return false;
 	}
 
 	/**
 	 * Adds, to the existing information, field(s) that are read in
 	 * the source record(s) from the first input.
-	 * 
-	 * @param readFields the position(s) in the source record(s)
-	 */
-	public void addReadFields1(FieldSet readFields) {
-		if (this.readFields1 == null) {
-			this.readFields1 = readFields;
-		} else {
-			this.readFields1 = this.readFields2.addFields(readFields);
-		}
-	}
-	
-	/**
-	 * Sets the field(s) that are read in the source record(s) from the first
-	 * input.
-	 * 
-	 * @param readFields the position(s) in the source record(s)
-	 */
-	public void setReadFields1(FieldSet readFields) {
-		this.readFields1 = readFields;
-	}
-	
-	/**
-	 * Gets the field(s) in the source record(s) from the first input
-	 * that are read.
-	 * 
-	 * @return the field(s) in the record, or null if they are not set
-	 */
-	public FieldSet getReadFields1() {
-		return this.readFields1;
-	}
-	
-	/**
-	 * Adds, to the existing information, field(s) that are read in
-	 * the source record(s) from the second input.
-	 * 
+	 *
+	 * @param input the input of the read fields
 	 * @param readFields the position(s) in the source record(s)
 	 */
-	public void addReadFields2(FieldSet readFields) {
-		if (this.readFields2 == null) {
-			this.readFields2 = readFields;
+	public void addReadFields(int input, FieldSet readFields) {
+
+		FieldSet curReadFields;
+
+		if (input != 0 && input != 1) {
+			throw new IndexOutOfBoundsException();
+		} else if (input == 0) {
+			this.readFields1 = (this.readFields1 == null) ? readFields.clone() : this.readFields1.addFields(readFields);
 		} else {
-			this.readFields2 = this.readFields2.addFields(readFields);
+			this.readFields2 = (this.readFields2 == null) ? readFields.clone() : this.readFields2.addFields(readFields);
 		}
 	}
-	
-	/**
-	 * Sets the field(s) that are read in the source record(s) from the second
-	 * input.
-	 * 
-	 * @param readFields the position(s) in the source record(s)
-	 */
-	public void setReadFields2(FieldSet readFields) {
-		this.readFields2 = readFields;
-	}
-	
-	/**
-	 * Gets the field(s) in the source record(s) from the second input
-	 * that are read.
-	 * 
-	 * @return the field(s) in the record, or null if they are not set
-	 */
-	public FieldSet getReadFields2() {
-		return this.readFields2;
-	}
-	
-	/**
-	 * Clears the object.
-	 */
-	@Override
-	public void clearProperties() {
-		super.clearProperties();
-		init();
-	}
 
 	@Override
-	public boolean isEmpty() {
-		return super.isEmpty() &&
-				(forwardedFields1 == null || forwardedFields1.isEmpty()) &&
-				(forwardedFields2 == null || forwardedFields2.isEmpty()) &&
-				(readFields1 == null || readFields1.size() == 0) &&
-				(readFields2 == null || readFields2.size() == 0);
-	}
-	
-	@Override
 	public String toString() {
-		return "DISP(" + this.forwardedFields1 + "; " + this.forwardedFields2 + ")";
+		return "DISP(" + this.fieldMapping1 + "; " + this.fieldMapping2 + ")";
 	}
 
-	private void init() {
-		this.forwardedFields1 = new HashMap<Integer,FieldSet>();
-		this.forwardedFields2 = new HashMap<Integer,FieldSet>();
-		this.readFields1 = null;
-		this.readFields2 = null;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index 7b90c8e..e99cac7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -144,7 +144,7 @@ public class Ordering {
 		}
 		
 		for (int i = 0; i < this.indexes.size(); i++) {
-			if (this.indexes.get(i).intValue() != otherOrdering.indexes.get(i).intValue()) {
+			if (this.indexes.get(i) != otherOrdering.indexes.get(i)) {
 				return false;
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
index da99018..5afee79 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SemanticProperties.java
@@ -21,72 +21,73 @@ package org.apache.flink.api.common.operators;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.util.FieldSet;
 
 /**
  * Container for the semantic properties associated to an operator.
  */
-public abstract class SemanticProperties implements Serializable {
-	private boolean allFieldsConstant;
+public interface SemanticProperties extends Serializable {
 
-	private static final long serialVersionUID = 1L;
+	/**
+	 * Returns the indexes of all target fields to which a source field has been
+	 * unmodified copied by a function.
+	 *
+	 * @param input The input id for the requested source field (0 for first input, 1 for second input)
+	 * @param sourceField The index of the field for which the target position index is requested.
+	 * @return A set containing the indexes of all target fields to which the source field has been unmodified copied.
+	 *
+	 */
+	public FieldSet getForwardingTargetFields(int input, int sourceField);
+
+	/**
+	 * Returns the index of the source field on the given input from which the target field
+	 * has been unmodified copied by a function.
+	 *
+	 * @param input The input id for the requested source field (0 for first input, 1 for second input)
+	 * @param targetField The index of the target field to which the source field has been copied.
+	 * @return The index of the source field on the given index that was copied to the given target field.
+	 * 			-1 if the target field was not copied from any source field of the given input.
+	 */
+	public int getForwardingSourceField(int input, int targetField);
 
-	/** Set of fields that are written in the destination record(s).*/
-	private FieldSet writtenFields;
-	
-	
 	/**
-	 * Adds, to the existing information, field(s) that are written in
-	 * the destination record(s).
-	 * 
-	 * @param writtenFields the position(s) in the destination record(s)
+	 * Returns the position indexes of all fields of an input that are accessed by a function.
+	 *
+	 * @param input The input id for which accessed fields are requested.
+	 * @return A set of fields of the specified input which have been accessed by the function. Null if no information is available.
 	 */
-	public void addWrittenFields(FieldSet writtenFields) {
-		if(this.writtenFields == null) {
-			this.writtenFields = writtenFields;
-		} else {
-			this.writtenFields = this.writtenFields.addFields(writtenFields);
+	public FieldSet getReadFields(int input);
+
+	// ----------------------------------------------------------------------
+
+	public static class InvalidSemanticAnnotationException extends InvalidProgramException {
+
+		public InvalidSemanticAnnotationException(String s) {
+			super(s);
 		}
-	}
 
-	public void setAllFieldsConstant(boolean constant) {
-		this.allFieldsConstant = constant;
+		public InvalidSemanticAnnotationException(String s, Throwable e) {
+			super(s,e);
+		}
 	}
 
-	public boolean isAllFieldsConstant() {
-		return this.allFieldsConstant;
-	}
+	public static class EmptySemanticProperties implements SemanticProperties {
+
+		@Override
+		public FieldSet getForwardingTargetFields(int input, int sourceField) {
+			return FieldSet.EMPTY_SET;
+		}
 
-	public abstract FieldSet getForwardFields(int input, int field);
+		@Override
+		public int getForwardingSourceField(int input, int targetField) {
+			return -1;
+		}
 
-	public abstract FieldSet getSourceField(int input, int field);
+		@Override
+		public FieldSet getReadFields(int input) {
+			return null;
+		}
 
-	/**
-	 * Sets the field(s) that are written in the destination record(s).
-	 * 
-	 * @param writtenFields the position(s) in the destination record(s)
-	 */
-	public void setWrittenFields(FieldSet writtenFields) {
-		this.writtenFields = writtenFields;
-	}
-	
-	/**
-	 * Gets the field(s) in the destination record(s) that are written.
-	 * 
-	 * @return the field(s) in the record, or null if they are not set
-	 */
-	public FieldSet getWrittenFields() {
-		return this.writtenFields;
-	}
-	
-	/**
-	 * Clears the object.
-	 */
-	public void clearProperties() {
-		this.writtenFields = null;
-	}
-	
-	public boolean isEmpty() {
-		return this.writtenFields == null || this.writtenFields.size() == 0;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index 5b491bf..eddf89b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -42,7 +42,7 @@ public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends
 	private final int[] keyFields;
 	
 	/** Semantic properties of the associated function. */
-	private SingleInputSemanticProperties semanticProperties;
+	private SingleInputSemanticProperties semanticProperties = new SingleInputSemanticProperties();
 	
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index abe995b..23bbc8b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -26,120 +26,90 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 /**
  * Container for the semantic properties associated to a single input operator.
  */
-public class SingleInputSemanticProperties extends SemanticProperties {
-	
+public class SingleInputSemanticProperties implements SemanticProperties {
 	private static final long serialVersionUID = 1L;
-	
-	/**Mapping from fields in the source record(s) to fields in the destination record(s). */
-	private Map<Integer,FieldSet> forwardedFields;
-	
-	/** Set of fields that are read in the source record(s).*/
+
+	/**
+	 * Mapping from fields in the source record(s) to fields in the destination
+	 * record(s).
+	 */
+	private Map<Integer,FieldSet> fieldMapping;
+
+	/**
+	 * Set of fields that are read in the source record(s).
+	 */
 	private FieldSet readFields;
 
+	public SingleInputSemanticProperties() {
+		this.fieldMapping = new HashMap<Integer, FieldSet>();
+		this.readFields = null;
+	}
+
 	@Override
-	public FieldSet getForwardFields(int input, int field) {
+	public FieldSet getForwardingTargetFields(int input, int sourceField) {
 		if (input != 0) {
 			throw new IndexOutOfBoundsException();
 		}
-		return this.getForwardedField(field);
+
+		return this.fieldMapping.containsKey(sourceField) ? this.fieldMapping.get(sourceField) : FieldSet.EMPTY_SET;
 	}
 
 	@Override
-	public FieldSet getSourceField(int input, int field) {
+	public int getForwardingSourceField(int input, int targetField) {
 		if (input != 0) {
 			throw new IndexOutOfBoundsException();
 		}
 
-		if (isAllFieldsConstant()) {
-			return new FieldSet(field);
+		for (Map.Entry<Integer, FieldSet> e : fieldMapping.entrySet()) {
+			if (e.getValue().contains(targetField)) {
+				return e.getKey();
+			}
 		}
-
-		return this.forwardedFrom(field);
+		return -1;
 	}
 
-	public FieldSet forwardedFrom(int dest) {
-		FieldSet fs = null;
-		for (Map.Entry<Integer, FieldSet> entry : forwardedFields.entrySet()) {
-			if (entry.getValue().contains(dest)) {
-				if (fs == null) {
-					fs = new FieldSet();
-				}
-
-				fs = fs.addField(entry.getKey());
-			}
+	@Override
+	public FieldSet getReadFields(int input) {
+		if (input != 0) {
+			throw new IndexOutOfBoundsException();
 		}
-		return fs;
-	}
 
-	public SingleInputSemanticProperties() {
-		init();
+		return this.readFields;
 	}
-	
+
 	/**
 	 * Adds, to the existing information, a field that is forwarded directly
 	 * from the source record(s) to the destination record(s).
-	 * 
+	 *
 	 * @param sourceField the position in the source record(s)
-	 * @param destinationField the position in the destination record(s)
+	 * @param targetField the position in the destination record(s)
 	 */
-	public void addForwardedField(int sourceField, int destinationField) {
-		FieldSet old = this.forwardedFields.get(sourceField);
-		if (old == null) {
-			old = FieldSet.EMPTY_SET;
+	public void addForwardedField(int sourceField, int targetField) {
+		if(isTargetFieldPresent(targetField)) {
+			throw new InvalidSemanticAnnotationException("Target field "+targetField+" was added twice.");
 		}
-		
-		FieldSet fs = old.addField(destinationField);
-		this.forwardedFields.put(sourceField, fs);
-	}
-	
-	/**
-	 * Adds, to the existing information, a field that is forwarded directly
-	 * from the source record(s) to multiple fields in the destination
-	 * record(s).
-	 * 
-	 * @param sourceField the position in the source record(s)
-	 * @param destinationFields the position in the destination record(s)
-	 */
-	public void addForwardedField(int sourceField, FieldSet destinationFields) {
-		FieldSet old = this.forwardedFields.get(sourceField);
-		if (old == null) {
-			old = FieldSet.EMPTY_SET;
+
+		FieldSet targetFields = fieldMapping.get(sourceField);
+		if (targetFields != null) {
+			fieldMapping.put(sourceField, targetFields.addField(targetField));
+		} else {
+			fieldMapping.put(sourceField, new FieldSet(targetField));
 		}
-		
-		FieldSet fs = old.addFields(destinationFields);
-		this.forwardedFields.put(sourceField, fs);
 	}
-	
-	/**
-	 * Sets a field that is forwarded directly from the source
-	 * record(s) to multiple fields in the destination record(s).
-	 * 
-	 * @param sourceField the position in the source record(s)
-	 * @param destinationFields the position in the destination record(s)
-	 */
-	public void setForwardedField(int sourceField, FieldSet destinationFields) {
-		this.forwardedFields.put(sourceField,destinationFields);
-	}
-	
-	/**
-	 * Gets the fields in the destination record where the source
-	 * field is forwarded.
-	 * 
-	 * @param sourceField the position in the source record
-	 * @return the destination fields, or null if they do not exist
-	 */
-	public FieldSet getForwardedField(int sourceField) {
-		if (isAllFieldsConstant()) {
-			return new FieldSet(sourceField);
-		}
 
-		return this.forwardedFields.get(sourceField);
+	private boolean isTargetFieldPresent(int targetField) {
+		for(FieldSet targetFields : fieldMapping.values()) {
+			if(targetFields.contains(targetField)) {
+				return true;
+			}
+		}
+		return false;
 	}
-	
+
 	/**
 	 * Adds, to the existing information, field(s) that are read in
 	 * the source record(s).
-	 * 
+	 *
 	 * @param readFields the position(s) in the source record(s)
 	 */
 	public void addReadFields(FieldSet readFields) {
@@ -149,112 +119,39 @@ public class SingleInputSemanticProperties extends SemanticProperties {
 			this.readFields = this.readFields.addFields(readFields);
 		}
 	}
-	
-	/**
-	 * Sets the field(s) that are read in the source record(s).
-	 * 
-	 * @param readFields the position(s) in the source record(s)
-	 */
-	public void setReadFields(FieldSet readFields) {
-		this.readFields = readFields;
-	}
-	
-	/**
-	 * Gets the field(s) in the source record(s) that are read.
-	 * 
-	 * @return the field(s) in the record, or null if they are not set
-	 */
-	public FieldSet getReadFields() {
-		return this.readFields;
-	}
-	
-	/**
-	 * Clears the object.
-	 */
-	@Override
-	public void clearProperties() {
-		super.clearProperties();
-		init();
-	}
-	
-	@Override
-	public boolean isEmpty() {
-		return super.isEmpty() &&
-				(forwardedFields == null || forwardedFields.isEmpty()) &&
-				(readFields == null || readFields.size() == 0);
-	}
 
 	@Override
 	public String toString() {
-		return "SISP(" + this.forwardedFields + ")";
+		return "SISP(" + this.fieldMapping + ")";
 	}
 
-	private void init() {
-		this.forwardedFields = new HashMap<Integer,FieldSet>();
-		this.readFields = null;
-	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public static class AllFieldsConstantProperties extends SingleInputSemanticProperties {
-		
-		private static final long serialVersionUID = 1L;
 
-		@Override
-		public FieldSet getReadFields() {
-			return FieldSet.EMPTY_SET;
-		}
-		
-		@Override
-		public FieldSet getWrittenFields() {
-			return FieldSet.EMPTY_SET;
-		}
+	public static class AllFieldsForwardedProperties extends SingleInputSemanticProperties {
+
+		private static final long serialVersionUID = 1L;
 
 		@Override
-		public FieldSet getForwardedField(int sourceField) {
+		public FieldSet getForwardingTargetFields(int input, int sourceField) {
+			if(input != 0) {
+				throw new IndexOutOfBoundsException();
+			}
 			return new FieldSet(sourceField);
 		}
-		
-		// ----- all mutating operations are unsupported -----
-		
-		@Override
-		public void addForwardedField(int sourceField, FieldSet destinationFields) {
-			throw new UnsupportedOperationException();
-		}
-		
-		@Override
-		public void addForwardedField(int sourceField, int destinationField) {
-			throw new UnsupportedOperationException();
-		}
-		
-		@Override
-		public void setForwardedField(int sourceField, FieldSet destinationFields) {
-			throw new UnsupportedOperationException();
-		}
-		
-		@Override
-		public void addReadFields(FieldSet readFields) {
-			throw new UnsupportedOperationException();
-		}
-		
-		@Override
-		public void setReadFields(FieldSet readFields) {
-			throw new UnsupportedOperationException();
-		}
 
 		@Override
-		public void addWrittenFields(FieldSet writtenFields) {
-			throw new UnsupportedOperationException();
+		public int getForwardingSourceField(int input, int targetField) {
+			if(input != 0) {
+				throw new IndexOutOfBoundsException();
+			}
+			return targetField;
 		}
 
 		@Override
-		public void setWrittenFields(FieldSet writtenFields) {
-			throw new UnsupportedOperationException();
-		}
-		
-		@Override
-		public boolean isEmpty() {
-			return false;
+		public void addForwardedField(int sourceField, int targetField) {
+			throw new UnsupportedOperationException("Cannot modify forwarded fields");
 		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index 6735298..3602a82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -82,7 +82,7 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF
 	
 	@Override
 	public SingleInputSemanticProperties getSemanticProperties() {
-		return new SingleInputSemanticProperties.AllFieldsConstantProperties();
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 0a2f1b0..c3ea0e4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -38,10 +39,41 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 	}
 	
 	/**
-	 * Returns the keyPosition for the given fieldPosition, offsetted by the given offset
+	 * Returns the flat field descriptors for the given field expression.
+	 *
+	 * @param fieldExpression The field expression for which the flat field descriptors are computed.
+	 * @return The list of descriptors for the flat fields which are specified by the field expression.
+	 */
+	public List<FlatFieldDescriptor> getFlatFields(String fieldExpression) {
+		List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>();
+		this.getFlatFields(fieldExpression, 0, result);
+		return result;
+	}
+
+	/**
+	 * Computes the flat field descriptors for the given field expression with the given offset.
+	 *
+	 * @param fieldExpression The field expression for which the FlatFieldDescriptors are computed.
+	 * @param offset The offset to use when computing the positions of the flat fields.
+	 * @param result The list into which all flat field descriptors are inserted.
+	 */
+	public abstract void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result);
+
+	/**
+	 * Returns the type of the (nested) field at the given field expression position.
+	 * Wildcards are not allowed.
+	 *
+	 * @param fieldExpression The field expression for which the field of which the type is returned.
+	 * @return The type of the field at the given field expression.
+	 */
+	public abstract <X> TypeInformation<X> getTypeAt(String fieldExpression);
+
+	/**
+	 * Returns the type of the (unnested) field at the given field position.
+	 *
+	 * @param pos The position of the (unnested) field in this composite type.
+	 * @return The type of the field at the given position.
 	 */
-	public abstract void getKey(String fieldExpression, int offset, List<FlatFieldDescriptor> result);
-	
 	public abstract <X> TypeInformation<X> getTypeAt(int pos);
 	
 	/**
@@ -105,8 +137,8 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 		private TypeInformation<?> type;
 		
 		public FlatFieldDescriptor(int keyPosition, TypeInformation<?> type) {
-			if( !(type instanceof AtomicType)) {
-				throw new IllegalArgumentException("A flattened field can only be an atomic type");
+			if(type instanceof CompositeType) {
+				throw new IllegalArgumentException("A flattened field can not be a composite type");
 			}
 			this.keyPosition = keyPosition;
 			this.type = type;
@@ -126,4 +158,11 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 			return "FlatFieldDescriptor [position="+keyPosition+" typeInfo="+type+"]";
 		}
 	}
+
+	public static class InvalidFieldReferenceException extends IllegalArgumentException {
+
+		public InvalidFieldReferenceException(String s) {
+			super(s);
+		}
+	}
 }


Mime
View raw message