flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/4] flink git commit: [FLINK-1444][api-extending] Add support for attaching data properties to data sources
Date Fri, 20 Feb 2015 15:11:47 GMT
Repository: flink
Updated Branches:
  refs/heads/master c56e3f10b -> bed3da4a6


[FLINK-1444][api-extending] Add support for attaching data properties to data sources

This closes #379


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0a28bf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0a28bf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0a28bf5

Branch: refs/heads/master
Commit: f0a28bf5345084a0a43df16021e60078e322e087
Parents: c56e3f1
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Feb 6 14:28:00 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Feb 20 16:10:01 2015 +0100

----------------------------------------------------------------------
 .../flink/compiler/dag/DataSourceNode.java      |  82 +-
 .../flink/compiler/plan/SourcePlanNode.java     |  12 +-
 .../flink/compiler/PropertyDataSourceTest.java  | 897 +++++++++++++++++++
 .../common/operators/GenericDataSourceBase.java |  41 +-
 .../flink/api/java/io/SplitDataProperties.java  | 464 ++++++++++
 .../flink/api/java/operators/DataSource.java    |  30 +-
 6 files changed, 1517 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index af2a92b..49946e0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -24,19 +24,25 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.common.io.ReplicatingInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.CostEstimator;
 import org.apache.flink.compiler.costs.Costs;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.compiler.plan.PlanNode;
 import org.apache.flink.compiler.plan.SourcePlanNode;
 import org.apache.flink.configuration.Configuration;
@@ -51,6 +57,10 @@ public class DataSourceNode extends OptimizerNode {
 
 	private final boolean replicatedInput;
 
+	private GlobalProperties gprops;
+
+	private LocalProperties lprops;
+
 	/**
 	 * Creates a new DataSourceNode for the given contract.
 	 * 
@@ -76,6 +86,20 @@ public class DataSourceNode extends OptimizerNode {
 		} else {
 			this.replicatedInput = false;
 		}
+
+		this.gprops = new GlobalProperties();
+		this.lprops = new LocalProperties();
+
+		SplitDataProperties<?> splitProps = pactContract.getSplitDataProperties();
+
+		if(replicatedInput) {
+			this.gprops.setFullyReplicated();
+			this.lprops = new LocalProperties();
+		} else if (splitProps != null) {
+			// configure data properties of data source using split properties
+			setDataPropertiesFromSplitProperties(splitProps);
+		}
+
 	}
 
 	/**
@@ -184,7 +208,8 @@ public class DataSourceNode extends OptimizerNode {
 			return this.cachedPlans;
 		}
 
-		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")");
+		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")",
+				this.gprops, this.lprops);
 
 		if(!replicatedInput) {
 			candidate.updatePropertiesWithUniqueSets(getUniqueFields());
@@ -205,8 +230,6 @@ public class DataSourceNode extends OptimizerNode {
 				estimator.addFileInputCost(this.estimatedOutputSize * this.getDegreeOfParallelism(), costs);
 			}
 			candidate.setCosts(costs);
-
-			candidate.getGlobalProperties().setFullyReplicated();
 		}
 
 		// since there is only a single plan for the data-source, return a list with that element only
@@ -228,4 +251,57 @@ public class DataSourceNode extends OptimizerNode {
 			visitor.postVisit(this);
 		}
 	}
+
+	private void setDataPropertiesFromSplitProperties(SplitDataProperties splitProps) {
+
+		// set global properties
+		int[] partitionKeys = splitProps.getSplitPartitionKeys();
+		Partitioner<?> partitioner = splitProps.getSplitPartitioner();
+
+		if(partitionKeys != null && partitioner != null) {
+			this.gprops.setCustomPartitioned(new FieldList(partitionKeys), partitioner);
+		}
+		else if(partitionKeys != null) {
+			this.gprops.setAnyPartitioning(new FieldList(partitionKeys));
+		}
+		// set local properties
+		int[] groupingKeys = splitProps.getSplitGroupKeys();
+		Ordering ordering = splitProps.getSplitOrder();
+
+		// more than one split per source tasks possible.
+		// adapt split grouping and sorting
+		if(ordering != null) {
+
+			// sorting falls back to grouping because a source can read multiple,
+			// randomly assigned splits
+			groupingKeys = ordering.getFieldPositions();
+		}
+
+		if(groupingKeys != null && partitionKeys != null) {
+			// check if grouping is also valid across splits, i.e., whether grouping keys are
+			// valid superset of partition keys
+			boolean allFieldsIncluded = true;
+			for(int i : partitionKeys) {
+				boolean fieldIncluded = false;
+				for(int j : groupingKeys) {
+					if(i == j) {
+						fieldIncluded = true;
+						break;
+					}
+				}
+				if(!fieldIncluded) {
+					allFieldsIncluded = false;
+					break;
+				}
+			}
+			if (allFieldsIncluded) {
+				this.lprops = LocalProperties.forGrouping(new FieldList(groupingKeys));
+			} else {
+				this.lprops = new LocalProperties();
+			}
+
+		} else {
+			this.lprops = new LocalProperties();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
index 891345d..813feb3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java
@@ -44,13 +44,17 @@ public class SourcePlanNode extends PlanNode {
 	 * @param template The template optimizer node that this candidate is created for.
 	 */
 	public SourcePlanNode(DataSourceNode template, String nodeName) {
+		this(template, nodeName, new GlobalProperties(), new LocalProperties());
+	}
+
+	public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) {
 		super(template, nodeName, DriverStrategy.NONE);
-		
-		this.globalProps = new GlobalProperties();
-		this.localProps = new LocalProperties();
+
+		this.globalProps = gprops;
+		this.localProps = lprops;
 		updatePropertiesWithUniqueSets(template.getUniqueFields());
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	public DataSourceNode getDataSourceNode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
new file mode 100644
index 0000000..7b023e5
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.plan.NAryUnionPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.SourcePlanNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"serial"})
+public class PropertyDataSourceTest extends CompilerTestBase {
+
+	private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new ArrayList<Tuple3<Long, SomePojo, String>>();
+	private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = new TupleTypeInfo<Tuple3<Long, SomePojo, String>>(
+			BasicTypeInfo.LONG_TYPE_INFO,
+			TypeExtractor.createTypeInfo(SomePojo.class),
+			BasicTypeInfo.STRING_TYPE_INFO
+	);
+
+	@Test
+	public void checkSinglePartitionedSource1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1, 0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource3() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("*");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 2, 3, 4)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource4() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource5() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.stringField");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource6() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField; f2");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedSource7() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 1, 0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(gprops.getCustomPartitioner() != null);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0)
+				.splitsGroupedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0)
+				.splitsGroupedBy(1, 0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedGroupedSource3() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1)
+				.splitsGroupedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource4() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0, 1)
+				.splitsGroupedBy(0);
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource5() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f2")
+				.splitsGroupedBy("f2");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedGroupedSource6() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField")
+				.splitsGroupedBy("f0; f1.intField");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedGroupedSource7() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField")
+				.splitsGroupedBy("f1");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedGroupedSource8() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1")
+				.splitsGroupedBy("f1.stringField");
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedOrderedSource1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1)
+				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(1)
+				.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedOrderedSource3() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0)
+				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource4() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy(0, 1)
+				.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource5() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+			.splitsPartitionedBy("f1.intField")
+			.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkSinglePartitionedOrderedSource6() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1.intField")
+				.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+	@Test
+	public void checkSinglePartitionedOrderedSource7() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+		data.getSplitDataProperties()
+				.splitsPartitionedBy("f1")
+				.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
+
+		data.print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+		GlobalProperties gprops = sourceNode.getGlobalProperties();
+		LocalProperties lprops = sourceNode.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
+		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+		Assert.assertTrue(lprops.getGroupedFields() == null);
+		Assert.assertTrue(lprops.getOrdering() == null);
+
+	}
+
+
+	@Test
+	public void checkCoPartitionedSources1() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data1 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data1.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 0);
+
+		DataSource<Tuple2<Long, String>> data2 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data2.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 0);
+
+		data1.union(data2).print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+		SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+		GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+		LocalProperties lprops1 = sourceNode1.getLocalProperties();
+		GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+		LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops1.getGroupedFields() == null);
+		Assert.assertTrue(lprops1.getOrdering() == null);
+
+		Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops2.getGroupedFields() == null);
+		Assert.assertTrue(lprops2.getOrdering() == null);
+
+		Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+	}
+
+	@Test
+	public void checkCoPartitionedSources2() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		DataSource<Tuple2<Long, String>> data1 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data1.getSplitDataProperties()
+				.splitsPartitionedBy("byCountry", 0);
+
+		DataSource<Tuple2<Long, String>> data2 =
+				env.readCsvFile("/some/path").types(Long.class, String.class);
+
+		data2.getSplitDataProperties()
+				.splitsPartitionedBy("byDate", 0);
+
+		data1.union(data2).print();
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+		SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+		GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+		LocalProperties lprops1 = sourceNode1.getLocalProperties();
+		GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+		LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+		Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops1.getGroupedFields() == null);
+		Assert.assertTrue(lprops1.getOrdering() == null);
+
+		Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+		Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+		Assert.assertTrue(lprops2.getGroupedFields() == null);
+		Assert.assertTrue(lprops2.getOrdering() == null);
+
+		Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+	}
+
+
+	public static class SomePojo {
+		public double doubleField;
+		public int intField;
+		public String stringField;
+	}
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 13c5dad..912d13d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -45,6 +46,8 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 
 	protected String statisticsKey;
 
+	private SplitDataProperties splitProperties;
+
 	/**
 	 * Creates a new instance for the given file using the given input format.
 	 *
@@ -157,7 +160,30 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 	public void setStatisticsKey(String statisticsKey) {
 		this.statisticsKey = statisticsKey;
 	}
-	
+
+	/**
+	 * Sets properties of input splits for this data source.
+	 * Split properties can help to generate more efficient execution plans.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong split data properties can cause wrong results!
+	 * </b>
+	 *
+	 * @param splitDataProperties The data properties of this data source's splits.
+	 */
+	public void setSplitDataProperties(SplitDataProperties<OUT> splitDataProperties) {
+		this.splitProperties = splitDataProperties;
+	}
+
+	/**
+	 * Returns the data properties of this data source's splits.
+	 *
+	 * @return The data properties of this data source's splits or null if no properties have been set.
+	 */
+	public SplitDataProperties<OUT> getSplitDataProperties() {
+		return this.splitProperties;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -209,4 +235,17 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 	public String toString() {
 		return this.name;
 	}
+
+
+	public static interface SplitDataProperties<T> {
+
+		public int[] getSplitPartitionKeys();
+
+		public Partitioner<T> getSplitPartitioner();
+
+		public int[] getSplitGroupKeys();
+
+		public Ordering getSplitOrder();
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
new file mode 100644
index 0000000..04d9953
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.Keys;
+
+import java.util.Arrays;
+
+/**
+ * SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit}
+ * generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}.
+ *
+ * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
+ * SplitDataProperties can define that the elements which are generated by the associated InputFormat
+ * are
+ * <ul>
+ *   <li>Partitioned on one or more fields across InputSplits, i.e., all elements with the same
+ *   (combination of) key(s) are located in the same input split.</li>
+ *   <li>Grouped on one or more fields within an InputSplit, i.e., all elements of an input split
+ *   that have the same (combination of) key(s) are emitted in a single sequence one after the other.</li>
+ *   <li>Ordered on one or more fields within an InputSplit, i.e., all elements within an input split
+ *    are in the defined order.</li>
+ * </ul>
+ *
+ * <b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
+ * data reorganization steps such as shuffling or sorting can be avoided.
+ * HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong!</b>
+ *
+ * @param <T> The type of the DataSource on which the SplitDataProperties are defined.
+ *
+ * @see org.apache.flink.core.io.InputSplit
+ * @see org.apache.flink.api.common.io.InputFormat
+ * @see org.apache.flink.api.java.operators.DataSource
+ */
+public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataProperties<T> {
+
+	private TypeInformation<T> type;
+
+	private int[] splitPartitionKeys;
+
+	private Partitioner<T> splitPartitioner;
+
+	private int[] splitGroupKeys;
+
+	private Ordering splitOrdering;
+
+	/**
+	 * Creates SplitDataProperties for the given data types.
+	 *
+	 * @param type The data type of the SplitDataProperties.
+	 */
+	public SplitDataProperties(TypeInformation<T> type) {
+		this.type = type;
+	}
+
+	/**
+	 * Creates SplitDataProperties for the given data types.
+	 *
+	 * @param source The DataSource for which the SplitDataProperties are created.
+	 */
+	public SplitDataProperties(DataSource<T> source) {
+		this.type = source.getType();
+	}
+
+	/**
+	 * Defines that data is partitioned across input splits on the fields defined by field positions.
+	 * All records sharing the same key (combination) must be contained in a single input split.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param partitionFields The field positions of the partitioning keys.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsPartitionedBy(int... partitionFields) {
+		return this.splitsPartitionedBy(null, partitionFields);
+	}
+
+	/**
+	 * Defines that data is partitioned using a specific partitioning method
+	 * across input splits on the fields defined by field positions.
+	 * All records sharing the same key (combination) must be contained in a single input split.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param partitionMethodId An ID for the method that was used to partition the data across splits.
+	 * @param partitionFields The field positions of the partitioning keys.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, int... partitionFields) {
+
+		if (partitionFields == null) {
+			throw new InvalidProgramException("PartitionFields may not be null.");
+		} else if (partitionFields.length == 0) {
+			throw new InvalidProgramException("PartitionFields may not be empty.");
+		}
+
+		this.splitPartitionKeys = getAllFlatKeys(partitionFields);
+		if (partitionMethodId != null) {
+			this.splitPartitioner = new SourcePartitionerMarker<T>(partitionMethodId);
+		} else {
+			this.splitPartitioner = null;
+		}
+
+		return this;
+	}
+
+	/**
+	 * Defines that data is partitioned across input splits on the fields defined by field expressions.
+ 	 * Multiple field expressions must be separated by the semicolon ';' character.
+	 * All records sharing the same key (combination) must be contained in a single input split.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param partitionFields The field expressions of the partitioning keys.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsPartitionedBy(String partitionFields) {
+		return this.splitsPartitionedBy(null, partitionFields);
+	}
+
+	/**
+	 * Defines that data is partitioned using an identifiable method
+	 * across input splits on the fields defined by field expressions.
+	 * Multiple field expressions must be separated by the semicolon ';' character.
+	 * All records sharing the same key (combination) must be contained in a single input split.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param partitionMethodId An ID for the method that was used to partition the data across splits.
+	 * @param partitionFields The field expressions of the partitioning keys.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, String partitionFields) {
+
+		if(partitionFields == null) {
+			throw new InvalidProgramException("PartitionFields may not be null.");
+		}
+
+		String[] partitionKeysA = partitionFields.split(";");
+		if (partitionKeysA.length == 0) {
+			throw new InvalidProgramException("PartitionFields may not be empty.");
+		}
+
+		this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
+		if(partitionMethodId != null) {
+			this.splitPartitioner = new SourcePartitionerMarker<T>(partitionMethodId);
+		}
+		else {
+			this.splitPartitioner = null;
+		}
+
+		return this;
+	}
+
+	/**
+	 * Defines that the data within an input split is grouped on the fields defined by the field positions.
+	 * All records sharing the same key (combination) must be subsequently emitted by the input
+	 * format for each input split.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param groupFields The field positions of the grouping keys.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsGroupedBy(int... groupFields) {
+
+		if(groupFields == null) {
+			throw new InvalidProgramException("GroupFields may not be null.");
+		} else if (groupFields.length == 0) {
+			throw new InvalidProgramException("GroupFields may not be empty.");
+		}
+
+		if(this.splitOrdering != null) {
+			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
+		}
+
+		this.splitGroupKeys = getAllFlatKeys(groupFields);
+
+		return this;
+	}
+
+	/**
+	 * Defines that the data within an input split is grouped on the fields defined by the field expressions.
+	 * Multiple field expressions must be separated by the semicolon ';' character.
+	 * All records sharing the same key (combination) must be subsequently emitted by the input
+	 * format for each input split.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param groupFields The field expressions of the grouping keys.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsGroupedBy(String groupFields) {
+
+		if(groupFields == null) {
+			throw new InvalidProgramException("GroupFields may not be null.");
+		}
+
+		String[] groupKeysA = groupFields.split(";");
+		if (groupKeysA.length == 0) {
+			throw new InvalidProgramException("GroupFields may not be empty.");
+		}
+
+		if(this.splitOrdering != null) {
+			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
+		}
+
+		this.splitGroupKeys = getAllFlatKeys(groupKeysA);
+
+		return this;
+	}
+
+	/**
+	 * Defines that the data within an input split is sorted on the fields defined by the field positions
+	 * in the specified orders.
+	 * All records of an input split must be emitted by the input format in the defined order.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param orderFields The field positions of the grouping keys.
+	 * @param orders The orders of the fields.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, Order[] orders) {
+
+		if(orderFields == null || orders == null) {
+			throw new InvalidProgramException("OrderFields or Orders may not be null.");
+		} else if (orderFields.length == 0) {
+			throw new InvalidProgramException("OrderFields may not be empty.");
+		} else if (orders.length == 0) {
+			throw new InvalidProgramException("Orders may not be empty");
+		} else if (orderFields.length != orders.length) {
+			throw new InvalidProgramException("Number of OrderFields and Orders must match.");
+		}
+
+		if(this.splitGroupKeys != null) {
+			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
+		}
+
+		this.splitOrdering = new Ordering();
+
+		for(int i=0; i<orderFields.length; i++) {
+			int pos = orderFields[i];
+			int[] flatKeys = this.getAllFlatKeys(new int[]{pos});
+
+			for(int key : flatKeys) {
+				// check for duplicates
+				for (int okey : splitOrdering.getFieldPositions()) {
+					if (key == okey) {
+						throw new InvalidProgramException("Duplicate field in the field expression " + pos);
+					}
+				}
+				// append key
+				this.splitOrdering.appendOrdering(key, null, orders[i] );
+			}
+		}
+		return this;
+	}
+
+	/**
+	 * Defines that the data within an input split is sorted on the fields defined by the field expressions
+	 * in the specified orders. Multiple field expressions must be separated by the semicolon ';' character.
+	 * All records of an input split must be emitted by the input format in the defined order.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @param orderFields The field expressions of the grouping key.
+	 * @param orders The orders of the fields.
+	 * @result This SplitDataProperties object.
+	 */
+	public SplitDataProperties<T> splitsOrderedBy(String orderFields, Order[] orders) {
+
+		if(orderFields == null || orders == null) {
+			throw new InvalidProgramException("OrderFields or Orders may not be null.");
+		}
+
+		String[] orderKeysA = orderFields.split(";");
+		if (orderKeysA.length == 0) {
+			throw new InvalidProgramException("OrderFields may not be empty.");
+		} else if (orders.length == 0) {
+			throw new InvalidProgramException("Orders may not be empty");
+		} else if (orderKeysA.length != orders.length) {
+			throw new InvalidProgramException("Number of OrderFields and Orders must match.");
+		}
+
+		if(this.splitGroupKeys != null) {
+			throw new InvalidProgramException("DataSource may either be grouped or sorted.");
+		}
+
+		this.splitOrdering = new Ordering();
+
+		for(int i=0; i<orderKeysA.length; i++) {
+			String keyExp = orderKeysA[i];
+			int[] flatKeys = this.computeFlatKeys(keyExp);
+
+			for(int key : flatKeys) {
+				// check for duplicates
+				for (int okey : splitOrdering.getFieldPositions()) {
+					if (key == okey) {
+						throw new InvalidProgramException("Duplicate field in field expression " + keyExp);
+					}
+				}
+				// append key
+				this.splitOrdering.appendOrdering(key, null, orders[i] );
+			}
+		}
+		return this;
+	}
+
+	public int[] getSplitPartitionKeys() {
+		return this.splitPartitionKeys;
+	}
+
+	public Partitioner<T> getSplitPartitioner() {
+		return this.splitPartitioner;
+	}
+
+	public int[] getSplitGroupKeys() {
+		return this.splitGroupKeys;
+	}
+
+	public Ordering getSplitOrder() {
+		return this.splitOrdering;
+	}
+
+
+	/////////////////////// FLAT FIELD EXTRACTION METHODS
+
+	private int[] getAllFlatKeys(String[] fieldExpressions) {
+
+		int[] allKeys = null;
+
+		for(String keyExp : fieldExpressions) {
+			int[] flatKeys = this.computeFlatKeys(keyExp);
+			if(allKeys == null) {
+				allKeys = flatKeys;
+			} else {
+				// check for duplicates
+				for(int key1 : flatKeys) {
+					for(int key2 : allKeys) {
+						if(key1 == key2) {
+							throw new InvalidProgramException("Duplicate fields in field expression "+keyExp);
+						}
+					}
+				}
+				// append flat keys
+				int oldLength = allKeys.length;
+				int newLength = oldLength + flatKeys.length;
+				allKeys = Arrays.copyOf(allKeys, newLength);
+				for(int i=0;i<flatKeys.length; i++) {
+					allKeys[oldLength+i] = flatKeys[i];
+				}
+			}
+		}
+
+		return allKeys;
+	}
+
+	private int[] getAllFlatKeys(int[] fieldPositions) {
+
+		Keys.ExpressionKeys<T> ek;
+		try {
+			ek = new Keys.ExpressionKeys<T>(fieldPositions, this.type);
+		} catch(IllegalArgumentException iae) {
+			throw new InvalidProgramException("Invalid specification of field expression.", iae);
+		}
+		return ek.computeLogicalKeyPositions();
+	}
+
+
+	private int[] computeFlatKeys(String fieldExpression) {
+
+		fieldExpression = fieldExpression.trim();
+
+		if(this.type instanceof CompositeType) {
+			// compute flat field positions for (nested) sorting fields
+			Keys.ExpressionKeys<T> ek;
+			try {
+				ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type);
+			} catch(IllegalArgumentException iae) {
+				throw new InvalidProgramException("Invalid specification of field expression.", iae);
+			}
+			return ek.computeLogicalKeyPositions();
+		} else {
+			fieldExpression = fieldExpression.trim();
+			if (!(fieldExpression.equals("*") || fieldExpression.equals("_"))) {
+				throw new InvalidProgramException("Data properties on non-composite types can only be defined on the full type. " +
+						"Use a field wildcard for that (\"*\" or \"_\")");
+			} else {
+				return new int[]{0};
+			}
+		}
+	}
+
+	/**
+	 * A custom partitioner to mark compatible split partitionings.
+	 *
+	 * @param <T> The type of the partitioned data.
+	 */
+	public static class SourcePartitionerMarker<T> implements Partitioner<T> {
+
+		String partitionMarker;
+
+		public SourcePartitionerMarker(String partitionMarker) {
+			this.partitionMarker = partitionMarker;
+		}
+
+		@Override
+		public int partition(T key, int numPartitions) {
+			throw new UnsupportedOperationException("The SourcePartitionerMarker is only used as a marker for compatible partitioning. " +
+					"It must not be invoked.");
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if(o instanceof SourcePartitionerMarker) {
+				return this.partitionMarker.equals(((SourcePartitionerMarker) o).partitionMarker);
+			} else {
+				return false;
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 2352269..d6e511a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.SplitDataProperties;
 import org.apache.flink.configuration.Configuration;
 
 /**
@@ -41,6 +42,8 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 
 	private Configuration parameters;
 
+	private SplitDataProperties<OUT> splitDataProperties;
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -90,7 +93,28 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	public Configuration getParameters() {
 		return this.parameters;
 	}
-	
+
+
+	/**
+	 * Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} for the
+	 * {@link org.apache.flink.core.io.InputSplit}s of this DataSource
+	 * for configurations.
+	 *
+	 * SplitDataProperties can help to generate more efficient execution plans.
+	 * <br>
+	 * <b>
+	 *     IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!
+	 * </b>
+	 *
+	 * @return The SplitDataProperties for the InputSplits of this DataSource.
+	 */
+	public SplitDataProperties<OUT> getSplitDataProperties() {
+		if(this.splitDataProperties == null) {
+			this.splitDataProperties = new SplitDataProperties<OUT>(this);
+		}
+		return this.splitDataProperties;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
@@ -106,6 +130,10 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 		if(this.parameters != null) {
 			source.getParameters().addAll(this.parameters);
 		}
+		if(this.splitDataProperties != null) {
+			source.setSplitDataProperties(this.splitDataProperties);
+		}
 		return source;
 	}
+
 }


Mime
View raw message