flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [11/13] flink git commit: [FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic annotations for functions. - Renamed constantField annotations to forwardedFields annotation - Forwarded fields can be defined for (nested) tuples, Po
Date Wed, 28 Jan 2015 01:24:11 GMT
[FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked semantic annotations for functions.
- Renamed constantField annotations to forwardedFields annotation
- Forwarded fields can be defined for (nested) tuples, Pojos, case classes
- Added semantic function information to example programs

This closes #311


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de8e066c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de8e066c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de8e066c

Branch: refs/heads/master
Commit: de8e066ccbd0a31e5746bc0bee524a48bba3a552
Parents: 78f41e9
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Dec 17 18:59:14 2014 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Jan 28 01:39:01 2015 +0100

----------------------------------------------------------------------
 .../spargel/java/VertexCentricIteration.java    |    2 +-
 .../spargel/java/SpargelTranslationTest.java    |    8 +-
 .../aggregation/ComparableAggregator.java       |    5 +-
 .../api/function/aggregation/SumAggregator.java |    4 +-
 .../dag/AbstractPartialSolutionNode.java        |    3 +-
 .../flink/compiler/dag/BinaryUnionNode.java     |   37 +-
 .../flink/compiler/dag/BulkIterationNode.java   |    3 +-
 .../apache/flink/compiler/dag/DataSinkNode.java |    3 +-
 .../flink/compiler/dag/DataSourceNode.java      |    3 +-
 .../apache/flink/compiler/dag/FilterNode.java   |    6 +-
 .../flink/compiler/dag/OptimizerNode.java       |   28 -
 .../flink/compiler/dag/PartitionNode.java       |    7 +-
 .../flink/compiler/dag/UnaryOperatorNode.java   |    4 +-
 .../compiler/dag/WorksetIterationNode.java      |    3 +-
 .../dataproperties/GlobalProperties.java        |  156 +-
 .../dataproperties/InterestingProperties.java   |    8 +-
 .../dataproperties/LocalProperties.java         |  139 +-
 .../RequestedGlobalProperties.java              |   99 +-
 .../RequestedLocalProperties.java               |   49 +-
 .../postpass/GenericFlatTypePostPass.java       |    2 +-
 .../apache/flink/compiler/DOPChangeTest.java    |    7 +-
 .../flink/compiler/IterationsCompilerTest.java  |    6 +-
 .../compiler/SemanticPropOptimizerTest.java     |  941 ---------
 .../SemanticPropertiesAPIToPlanTest.java        |  182 ++
 .../flink/compiler/SortPartialReuseTest.java    |    8 +-
 .../CoGroupCustomPartitioningTest.java          |    4 +-
 .../JoinCustomPartitioningTest.java             |    4 +-
 .../GlobalPropertiesFilteringTest.java          |  418 +++-
 .../GlobalPropertiesMatchingTest.java           |    3 +-
 .../GlobalPropertiesPushdownTest.java           |   29 +-
 .../LocalPropertiesFilteringTest.java           |  376 ++++
 .../dataproperties/MockDistribution.java        |   49 +
 .../dataproperties/MockPartitioner.java         |    5 +-
 .../RequestedGlobalPropertiesFilteringTest.java |  433 +++++
 .../RequestedLocalPropertiesFilteringTest.java  |  248 +++
 .../WorksetIterationsJavaApiCompilerTest.java   |    6 +-
 .../api/common/operators/DualInputOperator.java |    2 +-
 .../operators/DualInputSemanticProperties.java  |  329 +---
 .../flink/api/common/operators/Ordering.java    |    2 +-
 .../common/operators/SemanticProperties.java    |  103 +-
 .../common/operators/SingleInputOperator.java   |    2 +-
 .../SingleInputSemanticProperties.java          |  239 +--
 .../operators/base/PartitionOperatorBase.java   |    2 +-
 .../api/common/typeutils/CompositeType.java     |   49 +-
 .../DualInputSemanticPropertiesTest.java        |  255 +++
 .../SingleInputSemanticPropertiesTest.java      |  183 ++
 .../flink/examples/java/clustering/KMeans.java  |    9 +-
 .../java/graph/ConnectedComponents.java         |   16 +-
 .../examples/java/graph/EnumTrianglesBasic.java |    3 +
 .../examples/java/graph/EnumTrianglesOpt.java   |    4 +
 .../examples/java/graph/PageRankBasic.java      |    6 +-
 .../java/graph/TransitiveClosureNaive.java      |    6 +-
 .../examples/java/ml/LinearRegression.java      |    3 +
 .../java/relational/WebLogAnalysis.java         |    2 +
 .../examples/scala/clustering/KMeans.scala      |    8 +-
 .../scala/graph/ConnectedComponents.scala       |    6 +-
 .../examples/scala/graph/DeltaPageRank.scala    |    3 +-
 .../scala/graph/EnumTrianglesBasic.scala        |    3 +
 .../examples/scala/graph/EnumTrianglesOpt.scala |    5 +-
 .../examples/scala/graph/PageRankBasic.scala    |    4 +-
 .../scala/graph/TransitiveClosureNaive.scala    |    6 +-
 .../scala/relational/WebLogAnalysis.scala       |    4 +-
 .../api/java/functions/FunctionAnnotation.java  |  586 +++---
 .../api/java/functions/SemanticPropUtil.java    |  697 ++++---
 .../flink/api/java/operators/CrossOperator.java |   13 +-
 .../api/java/operators/DistinctOperator.java    |    3 +-
 .../flink/api/java/operators/JoinOperator.java  |   39 +-
 .../apache/flink/api/java/operators/Keys.java   |    8 +-
 .../api/java/operators/ProjectOperator.java     |    3 +-
 .../java/operators/SingleInputUdfOperator.java  |   98 +-
 .../api/java/operators/TwoInputUdfOperator.java |  188 +-
 .../translation/KeyExtractingMapper.java        |    3 +-
 .../translation/KeyRemovingMapper.java          |    3 +-
 .../translation/PlanFilterOperator.java         |    3 +-
 .../record/functions/FunctionAnnotation.java    |  233 +--
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  171 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  169 +-
 .../java/functions/SemanticPropUtilTest.java    | 1813 ++++++++++++------
 .../SemanticPropertiesProjectionTest.java       |  383 ++--
 .../SemanticPropertiesTranslationTest.java      |  798 +++++---
 .../flink/api/java/operators/KeysTest.java      |   21 +-
 .../record/CoGroupWrappingFunctionTest.java     |   19 +-
 .../java/record/ReduceWrappingFunctionTest.java |   19 +-
 .../type/extractor/PojoTypeExtractionTest.java  |   18 +-
 .../java/type/extractor/TypeExtractorTest.java  |   14 +-
 .../api/java/typeutils/CompositeTypeTest.java   |  178 ++
 .../typeutils/runtime/PojoSerializerTest.java   |    2 +-
 .../org/apache/flink/api/scala/DataSet.scala    |   18 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  165 +-
 .../CoGroupConnectedComponentsSecondITCase.java |    8 +-
 .../SemanticPropertiesTranslationTest.scala     |  152 +-
 .../scala/types/TypeInformationGenTest.scala    |  139 ++
 92 files changed, 6689 insertions(+), 3817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index a3a19d3..4f84467 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -350,7 +350,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 		}
 
 		// let the operator know that we preserve the key field
-		updates.withConstantSetFirst("0").withConstantSetSecond("0");
+		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
 		
 		return iteration.closeWith(updates, updates);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
index 96692ef..b31618c 100644
--- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -100,8 +100,8 @@ public class SpargelTranslationTest {
 			
 			// validate that the semantic properties are set as they should
 			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
 			
 			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
 			
@@ -179,8 +179,8 @@ public class SpargelTranslationTest {
 			
 			// validate that the semantic properties are set as they should
 			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
 			
 			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
index 7ea7ba1..226c45a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.function.aggregation;
 
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -189,9 +188,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> {
 			@SuppressWarnings("unchecked")
 			CompositeType<T> cType = (CompositeType<T>) typeInfo;
 
-			List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
-			cType.getKey(field, 0, fieldDescriptors);
-
+			List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
 			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
 
 			if (cType instanceof PojoTypeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
index 384b4f6..142028b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.function.aggregation;
 
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -138,8 +137,7 @@ public abstract class SumAggregator {
 			@SuppressWarnings("unchecked")
 			CompositeType<T> cType = (CompositeType<T>) type;
 
-			List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
-			cType.getKey(field, 0, fieldDescriptors);
+			List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
 
 			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
 			Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass();

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
index c54076d..d996fe9 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
 import org.apache.flink.compiler.plan.PlanNode;
@@ -88,7 +89,7 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return null;
+		return new EmptySemanticProperties();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
index 5d805a9..bbe9563 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
@@ -24,9 +24,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -255,9 +255,7 @@ public class BinaryUnionNode extends TwoInputNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		DualInputSemanticProperties sprops = new DualInputSemanticProperties();
-		sprops.setAllFieldsConstant(true);
-		return sprops;
+		return new UnionSemanticProperties();
 	}
 	
 	@Override
@@ -270,4 +268,35 @@ public class BinaryUnionNode extends TwoInputNode {
 		this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
 			in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
 	}
+
+	public static class UnionSemanticProperties implements SemanticProperties {
+
+		@Override
+		public FieldSet getForwardingTargetFields(int input, int sourceField) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException("Invalid input index for binary union node.");
+			}
+
+			return new FieldSet(sourceField);
+		}
+
+		@Override
+		public int getForwardingSourceField(int input, int targetField) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			return targetField;
+		}
+
+		@Override
+		public FieldSet getReadFields(int input) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			return FieldSet.EMPTY_SET;
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
index 18ac4c2..43b5799 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BulkIterationNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.compiler.CompilerException;
@@ -186,7 +187,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return null;
+		return new EmptySemanticProperties();
 	}
 	
 	protected void readStubAnnotations() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
index 2d4fb30..aa80451 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSinkNode.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -234,7 +235,7 @@ public class DataSinkNode extends OptimizerNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return null;
+		return new EmptySemanticProperties();
 	}
 		
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index 752a763..10c77ca 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.CostEstimator;
@@ -195,7 +196,7 @@ public class DataSourceNode extends OptimizerNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return null;
+		return new EmptySemanticProperties();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
index 140734c..33b2049 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
@@ -52,11 +52,7 @@ public class FilterNode extends SingleInputNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		sprops.setAllFieldsConstant(true);
-
-		return sprops;
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
index 75becf8..b717560 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/OptimizerNode.java
@@ -666,35 +666,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	// ------------------------------------------------------------------------
 	// Access of stub annotations
 	// ------------------------------------------------------------------------
-	
-	/**
-	 * Returns the key columns for the specific input, if all keys are preserved
-	 * by this node. Null, otherwise.
-	 */
-	protected int[] getConstantKeySet(int input) {
-		Operator<?> contract = getPactContract();
-		if (contract instanceof AbstractUdfOperator<?, ?>) {
-			AbstractUdfOperator<?, ?> abstractPact = (AbstractUdfOperator<?, ?>) contract;
-			int[] keyColumns = abstractPact.getKeyColumns(input);
-			if (keyColumns != null) {
-				if (keyColumns.length == 0) {
-					return null;
-				}
-				for (int keyColumn : keyColumns) {
-					FieldSet fs = getSemanticProperties() == null ? null : getSemanticProperties().getForwardFields(input, keyColumn);
 
-					if (fs == null) {
-						return null;
-					} else if (!fs.contains(keyColumn)) {
-						return null;
-					}
-				}
-				return keyColumns;
-			}
-		}
-		return null;
-	}
-	
 	/**
 	 * An optional method where nodes can describe which fields will be unique in their output.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
index 53b5dd9..75961bf 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -74,9 +76,8 @@ public class PartitionNode extends SingleInputNode {
 	}
 	
 	@Override
-	public boolean isFieldConstant(int input, int fieldNumber) {
-		// Partition does not change any data
-		return true;
+	public SemanticProperties getSemanticProperties() {
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
index aaf0a10..90ba480 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/UnaryOperatorNode.java
@@ -59,9 +59,7 @@ public class UnaryOperatorNode extends SingleInputNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
-		sprops.setAllFieldsConstant(true);
-		return sprops;
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index 95fc066..0557633 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
@@ -222,7 +223,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return null;
+		return new EmptySemanticProperties();
 	}
 
 	protected void readStubAnnotations() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index 4fe632a..fb1f1a2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -19,7 +19,6 @@
 package org.apache.flink.compiler.dataproperties;
 
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.Partitioner;
@@ -32,6 +31,8 @@ import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class represents global properties of the data at a certain point in the plan.
@@ -41,6 +42,8 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
  * or an FieldSet with the hash partitioning columns.
  */
 public class GlobalProperties implements Cloneable {
+
+	public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class);
 	
 	private PartitioningProperty partitioning;	// the type partitioning
 	
@@ -213,18 +216,6 @@ public class GlobalProperties implements Cloneable {
 		}
 	}
 
-	public Ordering getOrdering() {
-		return this.ordering;
-	}
-
-	public void setOrdering(Ordering ordering) {
-		this.ordering = ordering;
-	}
-
-	public void setPartitioningFields(FieldList partitioningFields) {
-		this.partitioningFields = partitioningFields;
-	}
-
 	public boolean isFullyReplicated() {
 		return this.partitioning == PartitioningProperty.FULL_REPLICATION;
 	}
@@ -246,83 +237,114 @@ public class GlobalProperties implements Cloneable {
 	}
 
 	/**
-	 * Filters these GlobalProperties by the fields that are constant or forwarded to another output field.
+	 * Filters these GlobalProperties by the fields that are forwarded to the output
+	 * as described by the SemanticProperties.
 	 *
-	 * @param props The node representing the contract.
+	 * @param props The semantic properties holding information about forwarded fields.
 	 * @param input The index of the input.
 	 * @return The filtered GlobalProperties
 	 */
 	public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
-		// check if partitioning survives
-		FieldList forwardFields = null;
-		GlobalProperties returnProps = this;
 
 		if (props == null) {
-			return new GlobalProperties();
+			throw new NullPointerException("SemanticProperties may not be null.");
 		}
 
-		if (this.ordering != null) {
-			Ordering no = new Ordering();
-			for (int index : this.ordering.getInvolvedIndexes()) {
-				forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList();
-				if (forwardFields == null) {
-					returnProps = new GlobalProperties();
-					no = null;
-					break;
-				} else {
-					returnProps = returnProps == this ? this.clone() : returnProps;
-					for (int i = 0; i < forwardFields.size(); i++) {
-						no.appendOrdering(forwardFields.get(i), this.ordering.getType(index), this.ordering.getOrder(index));
+		GlobalProperties gp = new GlobalProperties();
+
+		// filter partitioning
+		switch(this.partitioning) {
+			case FULL_REPLICATION:
+				return gp;
+			case RANGE_PARTITIONED:
+				// check if ordering is preserved
+				Ordering newOrdering = new Ordering();
+				for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) {
+					int sourceField = this.ordering.getInvolvedIndexes().get(i);
+					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
+
+					if (targetField == null || targetField.size() == 0) {
+						// partitioning is destroyed
+						newOrdering = null;
+						break;
+					} else {
+						// use any field of target fields for now. We should use something like field equivalence sets in the future.
+						if(targetField.size() > 1) {
+							LOG.warn("Found that a field is forwarded to more than one target field in " +
+									"semantic forwarded field information. Will only use the field with the lowest index.");
+						}
+						newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i));
 					}
 				}
-				returnProps.setOrdering(no);
-			}
-		}
-		if (this.partitioningFields != null) {
-			returnProps = returnProps == this ? this.clone() : returnProps;
-			returnProps.setPartitioningFields(new FieldList());
+				if(newOrdering != null) {
+					gp.partitioning = PartitioningProperty.RANGE_PARTITIONED;
+					gp.ordering = newOrdering;
+					gp.partitioningFields = newOrdering.getInvolvedIndexes();
+				}
+				break;
+			case HASH_PARTITIONED:
+			case ANY_PARTITIONING:
+			case CUSTOM_PARTITIONING:
+				FieldList newPartitioningFields = new FieldList();
+				for (int sourceField : this.partitioningFields) {
+					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
 
-			for (int index : this.partitioningFields) {
-				forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList();
-				if (forwardFields == null) {
-					returnProps = new GlobalProperties();
-					break;
-				} else  {
-					returnProps.setPartitioningFields(returnProps.getPartitioningFields().addFields(forwardFields));
+					if (targetField == null || targetField.size() == 0) {
+						newPartitioningFields = null;
+						break;
+					} else {
+						// use any field of target fields for now.  We should use something like field equivalence sets in the future.
+						if(targetField.size() > 1) {
+							LOG.warn("Found that a field is forwarded to more than one target field in " +
+									"semantic forwarded field information. Will only use the field with the lowest index.");
+						}
+						newPartitioningFields = newPartitioningFields.addField(targetField.toArray()[0]);
+					}
 				}
-			}
+				if(newPartitioningFields != null) {
+					gp.partitioning = this.partitioning;
+					gp.partitioningFields = newPartitioningFields;
+					gp.customPartitioner = this.customPartitioner;
+				}
+				break;
+			case FORCED_REBALANCED:
+			case RANDOM:
+				gp.partitioning = this.partitioning;
+				break;
+			default:
+				throw new RuntimeException("Unknown partitioning type.");
 		}
+
+		// filter unique field combinations
 		if (this.uniqueFieldCombinations != null) {
-			HashSet<FieldSet> newSet = new HashSet<FieldSet>();
-			newSet.addAll(this.uniqueFieldCombinations);
-			for (Iterator<FieldSet> combos = this.uniqueFieldCombinations.iterator(); combos.hasNext(); ){
-				FieldSet current = combos.next();
-				FieldSet nfs = new FieldSet();
-				for (Integer field : current) {
-					if (props.getForwardFields(input, field) == null) {
-						newSet.remove(current);
-						nfs = null;
+			Set<FieldSet> newUniqueFieldCombinations = new HashSet<FieldSet>();
+			for (FieldSet fieldCombo : this.uniqueFieldCombinations) {
+				FieldSet newFieldCombo = new FieldSet();
+				for (Integer sourceField : fieldCombo) {
+					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
+
+					if (targetField == null || targetField.size() == 0) {
+						newFieldCombo = null;
 						break;
 					} else {
-						nfs = nfs.addFields(props.getForwardFields(input, field));
+						// use any field of target fields for now.  We should use something like field equivalence sets in the future.
+						if(targetField.size() > 1) {
+							LOG.warn("Found that a field is forwarded to more than one target field in " +
+									"semantic forwarded field information. Will only use the field with the lowest index.");
+						}
+						newFieldCombo = newFieldCombo.addField(targetField.toArray()[0]);
 					}
 				}
-				if (nfs != null) {
-					newSet.remove(current);
-					newSet.add(nfs);
+				if (newFieldCombo != null) {
+					newUniqueFieldCombinations.add(newFieldCombo);
 				}
 			}
-
-			GlobalProperties gp = returnProps.clone();
-			gp.uniqueFieldCombinations = newSet.isEmpty() ? null : newSet;
-			return gp;
-		}
-
-		if (this.partitioning == PartitioningProperty.FULL_REPLICATION) {
-			return new GlobalProperties();
+			if(!newUniqueFieldCombinations.isEmpty()) {
+				gp.uniqueFieldCombinations = newUniqueFieldCombinations;
+			}
 		}
 
-		return returnProps;
+		return gp;
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
index 0f60576..85736c6 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/InterestingProperties.java
@@ -94,11 +94,11 @@ public class InterestingProperties implements Cloneable
 	public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input)
 	{
 		InterestingProperties iProps = new InterestingProperties();
-		SemanticProperties props = null;
-		if (node instanceof SingleInputNode) {
-			props = node.getSemanticProperties();
-		} else if (node instanceof TwoInputNode) {
+		SemanticProperties props;
+		if (node instanceof SingleInputNode || node instanceof TwoInputNode) {
 			props = node.getSemanticProperties();
+		} else {
+			props = new SemanticProperties.EmptySemanticProperties();
 		}
 
 		for (RequestedGlobalProperties rgp : this.globalProps) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
index b06774c..0555599 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/LocalProperties.java
@@ -20,20 +20,23 @@
 package org.apache.flink.compiler.dataproperties;
 
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class represents local properties of the data. A local property is a property that exists
  * within the data of a single partition.
  */
 public class LocalProperties implements Cloneable {
-	
+
+	public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class);
+
 	public static final LocalProperties EMPTY = new LocalProperties();
 	
 	// --------------------------------------------------------------------------------------------
@@ -126,101 +129,107 @@ public class LocalProperties implements Cloneable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Filters these properties by what can be preserved through a user function's constant fields set.
-	 *
-	 * @param props The optimizer node that potentially modifies the properties.
-	 * @param input The input of the node which is relevant.
+	 * Filters these LocalProperties by the fields that are forwarded to the output
+	 * as described by the SemanticProperties.
 	 *
+	 * @param props The semantic properties holding information about forwarded fields.
+	 * @param input The index of the input.
 	 * @return The filtered LocalProperties
 	 */
 	public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) {
-		// check, whether the local order is preserved
-		Ordering no = this.ordering;
-		FieldList ngf = this.groupedFields;
-		Set<FieldSet> nuf = this.uniqueFields;
-		FieldList forwardList = null;
 
 		if (props == null) {
-			return new LocalProperties();
+			throw new NullPointerException("SemanticProperties may not be null.");
 		}
 
+		LocalProperties returnProps = new LocalProperties();
+
+		// check if sorting is preserved
 		if (this.ordering != null) {
-			no = new Ordering();
-			final FieldList involvedIndexes = this.ordering.getInvolvedIndexes();
-			for (int i = 0; i < involvedIndexes.size(); i++) {
-				forwardList = props.getForwardFields(input, involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, involvedIndexes.get(i)).toFieldList();
+			Ordering newOrdering = new Ordering();
 
-				if (forwardList == null) {
-					no = null;
-					ngf = null;
-					/*if (i == 0) {
-						no = null;
-						ngf = null;
+			for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) {
+				int sourceField = this.ordering.getInvolvedIndexes().get(i);
+				FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
+				if (targetField == null || targetField.size() == 0) {
+					if (i == 0) {
+						// order fully destroyed
+						newOrdering = null;
+						break;
 					} else {
-						no = this.ordering.createNewOrderingUpToIndex(i);
-						ngf = no.getInvolvedIndexes();
-					}*/
-					break;
+						// order partially preserved
+						break;
+					}
 				} else {
-					no.appendOrdering(forwardList.get(0), this.ordering.getType(i), this.ordering.getOrder(i));
-					ngf = no.getInvolvedIndexes();
+					// use any field of target fields for now.  We should use something like field equivalence sets in the future.
+					if(targetField.size() > 1) {
+						LOG.warn("Found that a field is forwarded to more than one target field in " +
+								"semantic forwarded field information. Will only use the field with the lowest index.");
+					}
+					newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i));
 				}
 			}
+
+			returnProps.ordering = newOrdering;
+			if (newOrdering != null) {
+				returnProps.groupedFields = newOrdering.getInvolvedIndexes();
+			} else {
+				returnProps.groupedFields = null;
+			}
 		}
+		// check if grouping is preserved
 		else if (this.groupedFields != null) {
-			// check, whether the local key grouping is preserved
-			for (Integer index : this.groupedFields) {
-				forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList();
-				if (forwardList == null) {
-					ngf = null;
+			FieldList newGroupedFields = new FieldList();
+
+			for (Integer sourceField : this.groupedFields) {
+				FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
+				if (targetField == null || targetField.size() == 0) {
+					newGroupedFields = null;
 					break;
-				} else if (!forwardList.contains(index)) {
-					FieldList grouped = new FieldList();
-					for (Integer value : ngf.toFieldList()) {
-						if (value.intValue() == index) {
-							grouped = grouped.addFields(forwardList);
-						} else {
-							grouped = grouped.addField(value);
-						}
+				} else {
+					// use any field of target fields for now.  We should use something like field equivalence sets in the future.
+					if(targetField.size() > 1) {
+						LOG.warn("Found that a field is forwarded to more than one target field in " +
+								"semantic forwarded field information. Will only use the field with the lowest index.");
 					}
-					ngf = grouped;
+					newGroupedFields = newGroupedFields.addField(targetField.toArray()[0]);
 				}
 			}
+			returnProps.groupedFields = newGroupedFields;
 		}
 
 		if (this.uniqueFields != null) {
-			HashSet<FieldSet> newSet = new HashSet<FieldSet>();
-			newSet.addAll(this.uniqueFields);
-			for (Iterator<FieldSet> combos = this.uniqueFields.iterator(); combos.hasNext(); ){
-				FieldSet current = combos.next();
-				FieldSet nfs = new FieldSet();
-				for (Integer field : current) {
-					if (props.getForwardFields(input, field) == null) {
-						newSet.remove(current);
-						nfs = null;
+			Set<FieldSet> newUniqueFields = new HashSet<FieldSet>();
+			for (FieldSet fields : this.uniqueFields) {
+				FieldSet newFields = new FieldSet();
+				for (Integer sourceField : fields) {
+					FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
+
+					if (targetField == null || targetField.size() == 0) {
+						newFields = null;
 						break;
 					} else {
-						nfs = nfs.addFields(props.getForwardFields(input, field));
+						// use any field of target fields for now.  We should use something like field equivalence sets in the future.
+						if(targetField.size() > 1) {
+							LOG.warn("Found that a field is forwarded to more than one target field in " +
+									"semantic forwarded field information. Will only use the field with the lowest index.");
+						}
+						newFields = newFields.addField(targetField.toArray()[0]);
 					}
 				}
-				if (nfs != null) {
-					newSet.remove(current);
-					newSet.add(nfs);
+				if (newFields != null) {
+					newUniqueFields.add(newFields);
 				}
 			}
 
-			nuf = newSet.isEmpty() ? null : newSet;
+			if (!newUniqueFields.isEmpty()) {
+				returnProps.uniqueFields = newUniqueFields;
+			} else {
+				returnProps.uniqueFields = null;
+			}
 		}
 
-		if (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) {
-			return this;
-		} else {
-			LocalProperties lp = new LocalProperties();
-			lp.ordering = no;
-			lp.groupedFields = ngf;
-			lp.uniqueFields = nuf;
-			return lp;
-		}
+		return returnProps;
 	}
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index 1320f82..de56c37 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
@@ -189,73 +188,65 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.customPartitioner = null;
 	}
 
-	public void setPartitioningFields(FieldSet partitioned) {
-		this.partitioningFields = partitioned;
-	}
-
-	public void setPartitioningFields(FieldSet fields, PartitioningProperty partitioning) {
-		this.partitioningFields = fields;
-		this.partitioning = partitioning;
-	}
-
-	public void setOrdering(Ordering newOrdering) {
-		this.ordering = newOrdering;
-	}
-
 	/**
-	 * Filters these properties by what can be preserved by the given node when propagated down
+	 * Filters these properties by what can be preserved by the given SemanticProperties when propagated down
 	 * to the given input.
 	 *
-	 * @param props The node representing the contract.
-	 * @param input The index of the input.
+	 * @param props The SemanticProperties which define which fields are preserved.
+	 * @param input The index of the operator's input.
 	 * @return The filtered RequestedGlobalProperties
 	 */
 	public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
-		FieldList sourceList;
-		RequestedGlobalProperties returnProps = null;
+		RequestedGlobalProperties returnProps = new RequestedGlobalProperties();
 
+		// no semantic properties available. All global properties are filtered.
 		if (props == null) {
-			return null;
+			throw new NullPointerException("SemanticProperties may not be null.");
 		}
 
-		// check if partitioning survives
-		if (this.ordering != null) {
-			Ordering no = new Ordering();
-			returnProps = new RequestedGlobalProperties();
-			returnProps.setPartitioningFields(new FieldSet(), this.partitioning);
+		RequestedGlobalProperties rgProp = new RequestedGlobalProperties();
 
-			for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) {
-				int value = this.ordering.getInvolvedIndexes().get(index);
-				sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList();
-				if (sourceList != null) {
-					no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index));
-				} else {
-					return null;
+		switch(this.partitioning) {
+			case FULL_REPLICATION:
+			case FORCED_REBALANCED:
+			case CUSTOM_PARTITIONING:
+			case RANDOM:
+				// make sure that certain properties are not pushed down
+				return null;
+			case HASH_PARTITIONED:
+			case ANY_PARTITIONING:
+				FieldSet newFields = new FieldSet();
+				for (Integer targetField : this.partitioningFields) {
+					int sourceField = props.getForwardingSourceField(input, targetField);
+					if (sourceField >= 0) {
+						newFields = newFields.addField(sourceField);
+					} else {
+						// partial partitionings are not preserved to avoid skewed partitioning
+						return null;
+					}
 				}
-			}
-			returnProps.setOrdering(no);
-		} else if (this.partitioningFields != null) {
-			returnProps = new RequestedGlobalProperties();
-			returnProps.setPartitioningFields(new FieldSet(), this.partitioning);
-			for (Integer index : this.partitioningFields) {
-				sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList();
-				if (sourceList != null) {
-					returnProps.setPartitioningFields(returnProps.getPartitionedFields().addFields(sourceList), this.partitioning);
-				} else {
-					return null;
+				rgProp.partitioning = this.partitioning;
+				rgProp.partitioningFields = newFields;
+				return rgProp;
+			case RANGE_PARTITIONED:
+				// range partitioning
+				Ordering newOrdering = new Ordering();
+				for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) {
+					int value = this.ordering.getInvolvedIndexes().get(i);
+					int sourceField = props.getForwardingSourceField(input, value);
+					if (sourceField >= 0) {
+						newOrdering.appendOrdering(sourceField, this.ordering.getType(i), this.ordering.getOrder(i));
+					} else {
+						return null;
+					}
 				}
-			}
-		}
-		// make sure that certain properties are not pushed down
-		final PartitioningProperty partitioning = this.partitioning;
-		if (partitioning == PartitioningProperty.FULL_REPLICATION ||
-				partitioning == PartitioningProperty.FORCED_REBALANCED ||
-				partitioning == PartitioningProperty.CUSTOM_PARTITIONING)
-		{
-			return null;
+				rgProp.partitioning = this.partitioning;
+				rgProp.ordering = newOrdering;
+				rgProp.dataDistribution = this.dataDistribution;
+				return rgProp;
+			default:
+				throw new RuntimeException("Unknown partitioning type encountered.");
 		}
-
-		return returnProps;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
index a5bf118..1f69959 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedLocalProperties.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
@@ -135,50 +134,50 @@ public class RequestedLocalProperties implements Cloneable {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Filters these properties by what can be preserved through a user function's constant fields set.
-	 * Since interesting properties are filtered top-down, anything that partially destroys the ordering
-	 * makes the properties uninteresting.
-	 *
-	 * @param props The optimizer node that potentially modifies the properties.
-	 * @param input The input of the node which is relevant.
+	 * Filters these properties by what can be preserved by the given SemanticProperties when propagated down
+	 * to the given input.
 	 *
+	 * @param props The SemanticProperties which define which fields are preserved.
+	 * @param input The index of the operator's input.
 	 * @return The filtered RequestedLocalProperties
 	 */
 	public RequestedLocalProperties filterBySemanticProperties(SemanticProperties props, int input) {
-		FieldList sourceList;
-		RequestedLocalProperties returnProps = this;
 
+		// no semantic properties, all local properties are filtered
 		if (props == null) {
-			return null;
+			throw new NullPointerException("SemanticProperties may not be null.");
 		}
 
 		if (this.ordering != null) {
-			Ordering no = new Ordering();
-			returnProps = this.clone();
-			for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) {
-				int value = this.ordering.getInvolvedIndexes().get(index);
-				sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList();
-				if (sourceList != null) {
-					no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index));
+			Ordering newOrdering = new Ordering();
+
+			for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) {
+				int targetField = this.ordering.getInvolvedIndexes().get(i);
+				int sourceField = props.getForwardingSourceField(input, targetField);
+				if (sourceField >= 0) {
+					newOrdering.appendOrdering(sourceField, this.ordering.getType(i), this.ordering.getOrder(i));
 				} else {
 					return null;
 				}
 			}
-			returnProps.setOrdering(no);
+			return new RequestedLocalProperties(newOrdering);
+
 		} else if (this.groupedFields != null) {
-			returnProps =  this.clone();
-			returnProps.setGroupedFields(new FieldList());
+			FieldSet newGrouping = new FieldSet();
+
 			// check, whether the local key grouping is preserved
-			for (Integer index : this.groupedFields) {
-				sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList();
-				if (sourceList != null) {
-					returnProps.setGroupedFields(returnProps.getGroupedFields().addFields(sourceList));
+			for (Integer targetField : this.groupedFields) {
+				int sourceField = props.getForwardingSourceField(input, targetField);
+				if (sourceField >= 0) {
+					newGrouping = newGrouping.addField(sourceField);
 				} else {
 					return null;
 				}
 			}
+			return new RequestedLocalProperties(newGrouping);
+		} else {
+			return null;
 		}
-		return returnProps;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
index ab083ef..5da215f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
@@ -525,7 +525,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
 				Integer pos = entry.getKey();
 				SemanticProperties sprops = optNode.getSemanticProperties();
 
-				if (sprops != null && sprops.getForwardFields(input, pos) != null && sprops.getForwardFields(input,pos).contains(pos)) {
+				if (sprops != null && sprops.getForwardingTargetFields(input, pos) != null && sprops.getForwardingTargetFields(input, pos).contains(pos)) {
 					targetSchema.addType(pos, entry.getValue());
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index 93a1fc5..c3a4c3a 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -250,7 +250,7 @@ public class DOPChangeTest extends CompilerTestBase {
 		
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
-		
+
 		// check the optimized Plan
 		// when reducer 1 distributes its data across the instances of map2, it needs to employ a local hash method,
 		// because map2 has twice as many instances and key/value pairs with the same key need to be processed by the same
@@ -258,10 +258,7 @@ public class DOPChangeTest extends CompilerTestBase {
 		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
 		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
 
-		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
-
-		Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.NONE, red2Node.getInput().getLocalStrategy());
-		Assert.assertEquals("The Map 2 Node has an invalid local strategy.", LocalStrategy.SORT, map2Node.getInput().getLocalStrategy());
+		Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 87af91b..9445198 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
@@ -376,14 +376,14 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		}
 	}
 	
-	@ConstantFields("0")
+	@ForwardedFields("0")
 	public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
 		
 		@Override
 		public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
 	}
 	
-	@ConstantFields("0")
+	@ForwardedFields("0")
 	public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
 
 		@Override


Mime
View raw message