flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [26/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:07:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
deleted file mode 100644
index e65758f..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class NestedIterationsTest extends CompilerTestBase {
-
-	@Test
-	public void testRejectNestedBulkIterations() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> data = env.generateSequence(1, 100);
-			
-			IterativeDataSet<Long> outerIteration = data.iterate(100);
-			
-			IterativeDataSet<Long> innerIteration = outerIteration.map(new IdentityMapper<Long>()).iterate(100);
-			
-			DataSet<Long> innerResult = innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>()));
-			
-			DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
-			
-			outerResult.print();
-			
-			Plan p = env.createProgramPlan();
-			
-			try {
-				compileNoStats(p);
-			}
-			catch (CompilerException e) {
-				assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testRejectNestedWorksetIterations() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-			
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> outerIteration = data.iterateDelta(data, 100, 0);
-			
-			DataSet<Tuple2<Long, Long>> inOuter = outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
-			
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> innerIteration = inOuter.iterateDelta(inOuter, 100, 0);
-			
-			DataSet<Tuple2<Long, Long>> inInner = innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
-			
-			DataSet<Tuple2<Long, Long>> innerResult = innerIteration.closeWith(inInner, inInner).map(new IdentityMapper<Tuple2<Long,Long>>());
-			
-			DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
-			
-			outerResult.print();
-			
-			Plan p = env.createProgramPlan();
-			
-			try {
-				compileNoStats(p);
-			}
-			catch (CompilerException e) {
-				assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testBulkIterationInClosure() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> data1 = env.generateSequence(1, 100);
-			DataSet<Long> data2 = env.generateSequence(1, 100);
-			
-			IterativeDataSet<Long> firstIteration = data1.iterate(100);
-			
-			DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
-			
-			
-			IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
-			
-			DataSet<Long> joined = mainIteration.join(firstResult)
-					.where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
-					.with(new DummyFlatJoinFunction<Long>());
-			
-			DataSet<Long> mainResult = mainIteration.closeWith(joined);
-			
-			mainResult.print();
-			
-			Plan p = env.createProgramPlan();
-			
-			// optimizer should be able to translate this
-			OptimizedPlan op = compileNoStats(p);
-			
-			// job graph generator should be able to translate this
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testDeltaIterationInClosure() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Tuple2<Long, Long>> data1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-			DataSet<Tuple2<Long, Long>> data2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-			
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> firstIteration = data1.iterateDelta(data1, 100, 0);
-			
-			DataSet<Tuple2<Long, Long>> inFirst = firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
-			
-			DataSet<Tuple2<Long, Long>> firstResult = firstIteration.closeWith(inFirst, inFirst).map(new IdentityMapper<Tuple2<Long,Long>>());
-			
-			
-			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);
-			
-			DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
-							.projectFirst(0).projectSecond(0);
-			
-			DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
-			
-			mainResult.print();
-			
-			Plan p = env.createProgramPlan();
-			
-			// optimizer should be able to translate this
-			OptimizedPlan op = compileNoStats(p);
-			
-			// job graph generator should be able to translate this
-			new JobGraphGenerator().compileJobGraph(op);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
deleted file mode 100644
index 2b42f85..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class PartitionPushdownTest extends CompilerTestBase {
-
-	@Test
-	public void testPartitioningNotPushedDown() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-			
-			input
-				.groupBy(0, 1).sum(2)
-				.groupBy(0).sum(1)
-				.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			
-			SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
-			SingleInputPlanNode agg2Combiner = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
-			SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Combiner.getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, agg2Reducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(0), agg2Reducer.getInput().getShipStrategyKeys());
-			
-			assertEquals(ShipStrategyType.FORWARD, agg2Combiner.getInput().getShipStrategy());
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(0, 1), agg1Reducer.getInput().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPartitioningReused() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-			
-			input
-				.groupBy(0).sum(1)
-				.groupBy(0, 1).sum(2)
-				.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			
-			SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
-			SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
-			
-			assertEquals(ShipStrategyType.FORWARD, agg2Reducer.getInput().getShipStrategy());
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(0), agg1Reducer.getInput().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
deleted file mode 100644
index 16684dc..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ /dev/null
@@ -1,845 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.translation.JavaPlan;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class PartitioningReusageTest extends CompilerTestBase {
-
-	@Test
-	public void noPreviousPartitioningJoin1() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-					.where(0,1).equalTo(0,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-
-	}
-
-	@Test
-	public void noPreviousPartitioningJoin2() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-
-	}
-
-	@Test
-	public void reuseSinglePartitioningJoin1() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(0,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-
-	}
-
-	@Test
-	public void reuseSinglePartitioningJoin2() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseSinglePartitioningJoin3() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.join(set2.partitionByHash(2, 1)
-							.map(new MockMapper())
-							.withForwardedFields("2;1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseSinglePartitioningJoin4() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0)
-				.map(new MockMapper()).withForwardedFields("0")
-				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseSinglePartitioningJoin5() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.join(set2.partitionByHash(2)
-							.map(new MockMapper())
-							.withForwardedFields("2"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseBothPartitioningJoin1() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.join(set2.partitionByHash(0,1)
-							.map(new MockMapper())
-							.withForwardedFields("0;1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(0,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-
-	@Test
-	public void reuseBothPartitioningJoin2() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.join(set2.partitionByHash(1,2)
-								.map(new MockMapper())
-								.withForwardedFields("1;2"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseBothPartitioningJoin3() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0)
-				.map(new MockMapper()).withForwardedFields("0")
-				.join(set2.partitionByHash(2,1)
-								.map(new MockMapper())
-								.withForwardedFields("2;1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,1).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseBothPartitioningJoin4() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0,2)
-				.map(new MockMapper()).withForwardedFields("0;2")
-				.join(set2.partitionByHash(1)
-								.map(new MockMapper())
-								.withForwardedFields("1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,2).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseBothPartitioningJoin5() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(2)
-				.map(new MockMapper()).withForwardedFields("2")
-				.join(set2.partitionByHash(1)
-								.map(new MockMapper())
-								.withForwardedFields("1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,2).equalTo(2,1).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseBothPartitioningJoin6() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(0)
-				.map(new MockMapper()).withForwardedFields("0")
-				.join(set2.partitionByHash(1)
-								.map(new MockMapper())
-								.withForwardedFields("1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,2).equalTo(1,2).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-	@Test
-	public void reuseBothPartitioningJoin7() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
-				.partitionByHash(2)
-				.map(new MockMapper()).withForwardedFields("2")
-				.join(set2.partitionByHash(1)
-								.map(new MockMapper())
-								.withForwardedFields("1"),
-						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
-				.where(0,2).equalTo(1,2).with(new MockJoin());
-
-		joined.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidJoinInputProperties(join);
-	}
-
-
-	@Test
-	public void noPreviousPartitioningCoGroup1() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.coGroup(set2)
-				.where(0,1).equalTo(0,1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-
-	}
-
-	@Test
-	public void noPreviousPartitioningCoGroup2() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.coGroup(set2)
-				.where(0,1).equalTo(2,1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-
-	}
-
-	@Test
-	public void reuseSinglePartitioningCoGroup1() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.coGroup(set2)
-				.where(0,1).equalTo(0,1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-
-	}
-
-	@Test
-	public void reuseSinglePartitioningCoGroup2() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.coGroup(set2)
-				.where(0,1).equalTo(2,1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseSinglePartitioningCoGroup3() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.coGroup(set2.partitionByHash(2, 1)
-								.map(new MockMapper())
-								.withForwardedFields("2;1"))
-				.where(0,1).equalTo(2, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseSinglePartitioningCoGroup4() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0)
-				.map(new MockMapper()).withForwardedFields("0")
-				.coGroup(set2)
-				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseSinglePartitioningCoGroup5() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.coGroup(set2.partitionByHash(2)
-								.map(new MockMapper())
-								.withForwardedFields("2"))
-				.where(0,1).equalTo(2,1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseBothPartitioningCoGroup1() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.coGroup(set2.partitionByHash(0, 1)
-						.map(new MockMapper())
-						.withForwardedFields("0;1"))
-				.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-
-	@Test
-	public void reuseBothPartitioningCoGroup2() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0,1)
-				.map(new MockMapper()).withForwardedFields("0;1")
-				.coGroup(set2.partitionByHash(1, 2)
-						.map(new MockMapper())
-						.withForwardedFields("1;2"))
-				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseBothPartitioningCoGroup3() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0)
-				.map(new MockMapper()).withForwardedFields("0")
-				.coGroup(set2.partitionByHash(2, 1)
-						.map(new MockMapper())
-						.withForwardedFields("2;1"))
-				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseBothPartitioningCoGroup4() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(0,2)
-				.map(new MockMapper()).withForwardedFields("0;2")
-				.coGroup(set2.partitionByHash(1)
-						.map(new MockMapper())
-						.withForwardedFields("1"))
-				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseBothPartitioningCoGroup5() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(2)
-				.map(new MockMapper()).withForwardedFields("2")
-				.coGroup(set2.partitionByHash(1)
-						.map(new MockMapper())
-						.withForwardedFields("1"))
-				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseBothPartitioningCoGroup6() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(2)
-				.map(new MockMapper()).withForwardedFields("2")
-				.coGroup(set2.partitionByHash(2)
-						.map(new MockMapper())
-						.withForwardedFields("2"))
-				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-	@Test
-	public void reuseBothPartitioningCoGroup7() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
-
-		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
-				.partitionByHash(2)
-				.map(new MockMapper()).withForwardedFields("2")
-				.coGroup(set2.partitionByHash(1)
-						.map(new MockMapper())
-						.withForwardedFields("1"))
-				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
-
-		coGrouped.print();
-		JavaPlan plan = env.createProgramPlan();
-		OptimizedPlan oPlan = compileWithStats(plan);
-
-		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
-		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
-
-		checkValidCoGroupInputProperties(coGroup);
-	}
-
-
-
-	private void checkValidJoinInputProperties(DualInputPlanNode join) {
-
-		GlobalProperties inProps1 = join.getInput1().getGlobalProperties();
-		GlobalProperties inProps2 = join.getInput2().getGlobalProperties();
-
-		if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
-				inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
-
-			// check that both inputs are hash partitioned on the same fields
-			FieldList pFields1 = inProps1.getPartitioningFields();
-			FieldList pFields2 = inProps2.getPartitioningFields();
-
-			assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
-					pFields1.size() == pFields2.size());
-
-			FieldList reqPFields1 = join.getKeysForInput1();
-			FieldList reqPFields2 = join.getKeysForInput2();
-
-			for(int i=0; i<pFields1.size(); i++) {
-
-				// get fields
-				int f1 = pFields1.get(i);
-				int f2 = pFields2.get(i);
-
-				// check that field positions in original key field list are identical
-				int pos1 = getPosInFieldList(f1, reqPFields1);
-				int pos2 = getPosInFieldList(f2, reqPFields2);
-
-				if(pos1 < 0) {
-					fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
-				}
-				if(pos2 < 0) {
-					fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
-				}
-				if(pos1 != pos2) {
-					fail("Inputs are not partitioned on the same key fields");
-				}
-			}
-
-		}
-		else if(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION &&
-				inProps2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) {
-			// we are good. No need to check for fields
-		}
-		else if(inProps1.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED &&
-				inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) {
-			// we are good. No need to check for fields
-		}
-		else {
-			throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs");
-		}
-
-	}
-
-	private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) {
-
-		GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties();
-		GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties();
-
-		if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
-				inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
-
-			// check that both inputs are hash partitioned on the same fields
-			FieldList pFields1 = inProps1.getPartitioningFields();
-			FieldList pFields2 = inProps2.getPartitioningFields();
-
-			assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
-					pFields1.size() == pFields2.size());
-
-			FieldList reqPFields1 = coGroup.getKeysForInput1();
-			FieldList reqPFields2 = coGroup.getKeysForInput2();
-
-			for(int i=0; i<pFields1.size(); i++) {
-
-				// get fields
-				int f1 = pFields1.get(i);
-				int f2 = pFields2.get(i);
-
-				// check that field positions in original key field list are identical
-				int pos1 = getPosInFieldList(f1, reqPFields1);
-				int pos2 = getPosInFieldList(f2, reqPFields2);
-
-				if(pos1 < 0) {
-					fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
-				}
-				if(pos2 < 0) {
-					fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
-				}
-				if(pos1 != pos2) {
-					fail("Inputs are not partitioned on the same key fields");
-				}
-			}
-
-		}
-		else {
-			throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs");
-		}
-
-	}
-
-	private int getPosInFieldList(int field, FieldList list) {
-
-		int pos;
-		for(pos=0; pos<list.size(); pos++) {
-			if(field == list.get(pos)) {
-				break;
-			}
-		}
-		if(pos == list.size()) {
-			return -1;
-		} else {
-			return pos;
-		}
-
-	}
-
-
-
-	public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
-		@Override
-		public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception {
-			return null;
-		}
-	}
-
-	public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>,
-			Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
-
-		@Override
-		public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
-			return null;
-		}
-	}
-
-	public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>,
-				Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
-
-		@Override
-		public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second,
-							Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
-
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
deleted file mode 100644
index 86f01b0..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.configuration.Configuration;
-
-@SuppressWarnings("serial")
-public class PipelineBreakerTest extends CompilerTestBase {
-
-	@Test
-	public void testPipelineBreakerWithBroadcastVariable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(64);
-			
-			DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
-			
-			DataSet<Long> result = source.map(new IdentityMapper<Long>())
-										.map(new IdentityMapper<Long>())
-											.withBroadcastSet(source, "bc");
-			
-			result.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-			
-			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPipelineBreakerBroadcastedAllReduce() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(64);
-			
-			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
-			
-			DataSet<Long> bcInput1 = sourceWithMapper
-										.map(new IdentityMapper<Long>())
-										.reduce(new SelectOneReducer<Long>());
-			DataSet<Long> bcInput2 = env.generateSequence(1, 10);
-			
-			DataSet<Long> result = sourceWithMapper
-					.map(new IdentityMapper<Long>())
-							.withBroadcastSet(bcInput1, "bc1")
-							.withBroadcastSet(bcInput2, "bc2");
-			
-			result.print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-			
-			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPipelineBreakerBroadcastedPartialSolution() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(64);
-			
-			
-			DataSet<Long> initialSource = env.generateSequence(1, 10);
-			IterativeDataSet<Long> iteration = initialSource.iterate(100);
-			
-			
-			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
-			
-			DataSet<Long> bcInput1 = sourceWithMapper
-										.map(new IdentityMapper<Long>())
-										.reduce(new SelectOneReducer<Long>());
-			
-			DataSet<Long> result = sourceWithMapper
-					.map(new IdentityMapper<Long>())
-							.withBroadcastSet(iteration, "bc2")
-							.withBroadcastSet(bcInput1, "bc1");
-							
-			
-			iteration.closeWith(result).print();
-			
-			Plan p = env.createProgramPlan();
-			OptimizedPlan op = compileNoStats(p);
-			
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
-			SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();
-			
-			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPilelineBreakerWithCross() {
-		try {
-			{
-				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
-				
-				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
-				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
-				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
-				Plan p = env.createProgramPlan();
-				OptimizedPlan op = compileNoStats(p);
-				SinkPlanNode sink = op.getDataSinks().iterator().next();
-				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
-				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
-			}
-			
-			{
-				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
-				
-				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
-				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
-				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
-				Plan p = env.createProgramPlan();
-				OptimizedPlan op = compileNoStats(p);
-				
-				SinkPlanNode sink = op.getDataSinks().iterator().next();
-				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
-				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
-			}
-			
-			{
-				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
-				
-				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
-				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
-				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
-				Plan p = env.createProgramPlan();
-				OptimizedPlan op = compileNoStats(p);
-				
-				SinkPlanNode sink = op.getDataSinks().iterator().next();
-				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
-				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
-			}
-			
-			{
-				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(64);
-				
-				DataSet<Long> initialSource = env.generateSequence(1, 10);
-				
-				Configuration conf= new Configuration();
-				conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
-				initialSource
-					.map(new IdentityMapper<Long>())
-					.cross(initialSource).withParameters(conf)
-					.print();
-				
-				
-				Plan p = env.createProgramPlan();
-				OptimizedPlan op = compileNoStats(p);
-				
-				SinkPlanNode sink = op.getDataSinks().iterator().next();
-				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
-				
-				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
deleted file mode 100644
index 7be2b16..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.translation.JavaPlan;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@SuppressWarnings({"serial"})
-public class PropertyDataSourceTest extends CompilerTestBase {
-
-	private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new ArrayList<Tuple3<Long, SomePojo, String>>();
-	private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = new TupleTypeInfo<Tuple3<Long, SomePojo, String>>(
-			BasicTypeInfo.LONG_TYPE_INFO,
-			TypeExtractor.createTypeInfo(SomePojo.class),
-			BasicTypeInfo.STRING_TYPE_INFO
-	);
-
-	@Test
-	public void checkSinglePartitionedSource1() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedSource2() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(1, 0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedSource3() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("*");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 2, 3, 4)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedSource4() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 3)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedSource5() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1.stringField");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedSource6() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1.intField; f2");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedSource7() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("byDate", 1, 0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
-		Assert.assertTrue(gprops.getCustomPartitioner() != null);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedGroupedSource1() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(0)
-				.splitsGroupedBy(0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedGroupedSource2() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(0)
-				.splitsGroupedBy(1, 0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkSinglePartitionedGroupedSource3() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(1)
-				.splitsGroupedBy(0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedGroupedSource4() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(0, 1)
-				.splitsGroupedBy(0);
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedGroupedSource5() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f2")
-				.splitsGroupedBy("f2");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkSinglePartitionedGroupedSource6() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1.intField")
-				.splitsGroupedBy("f0; f1.intField");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkSinglePartitionedGroupedSource7() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1.intField")
-				.splitsGroupedBy("f1");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedGroupedSource8() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1")
-				.splitsGroupedBy("f1.stringField");
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkSinglePartitionedOrderedSource1() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(1)
-				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedOrderedSource2() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(1)
-				.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkSinglePartitionedOrderedSource3() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(0)
-				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedOrderedSource4() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy(0, 1)
-				.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedOrderedSource5() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-			.splitsPartitionedBy("f1.intField")
-			.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkSinglePartitionedOrderedSource6() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1.intField")
-				.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-	@Test
-	public void checkSinglePartitionedOrderedSource7() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
-
-		data.getSplitDataProperties()
-				.splitsPartitionedBy("f1")
-				.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
-
-		data.print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
-
-		GlobalProperties gprops = sourceNode.getGlobalProperties();
-		LocalProperties lprops = sourceNode.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
-		Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
-		Assert.assertTrue(lprops.getGroupedFields() == null);
-		Assert.assertTrue(lprops.getOrdering() == null);
-
-	}
-
-
-	@Test
-	public void checkCoPartitionedSources1() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data1 =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data1.getSplitDataProperties()
-				.splitsPartitionedBy("byDate", 0);
-
-		DataSource<Tuple2<Long, String>> data2 =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data2.getSplitDataProperties()
-				.splitsPartitionedBy("byDate", 0);
-
-		data1.union(data2).print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
-		SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
-
-		GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
-		LocalProperties lprops1 = sourceNode1.getLocalProperties();
-		GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
-		LocalProperties lprops2 = sourceNode2.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
-		Assert.assertTrue(lprops1.getGroupedFields() == null);
-		Assert.assertTrue(lprops1.getOrdering() == null);
-
-		Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
-		Assert.assertTrue(lprops2.getGroupedFields() == null);
-		Assert.assertTrue(lprops2.getOrdering() == null);
-
-		Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
-	}
-
-	@Test
-	public void checkCoPartitionedSources2() {
-
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
-		DataSource<Tuple2<Long, String>> data1 =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data1.getSplitDataProperties()
-				.splitsPartitionedBy("byCountry", 0);
-
-		DataSource<Tuple2<Long, String>> data2 =
-				env.readCsvFile("/some/path").types(Long.class, String.class);
-
-		data2.getSplitDataProperties()
-				.splitsPartitionedBy("byDate", 0);
-
-		data1.union(data2).print();
-
-		JavaPlan plan = env.createProgramPlan();
-
-		// submit the plan to the compiler
-		OptimizedPlan oPlan = compileNoStats(plan);
-
-		// check the optimized Plan
-		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-		SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
-		SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
-
-		GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
-		LocalProperties lprops1 = sourceNode1.getLocalProperties();
-		GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
-		LocalProperties lprops2 = sourceNode2.getLocalProperties();
-
-		Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
-		Assert.assertTrue(lprops1.getGroupedFields() == null);
-		Assert.assertTrue(lprops1.getOrdering() == null);
-
-		Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
-		Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
-		Assert.assertTrue(lprops2.getGroupedFields() == null);
-		Assert.assertTrue(lprops2.getOrdering() == null);
-
-		Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
-	}
-
-
-	public static class SomePojo {
-		public double doubleField;
-		public int intField;
-		public String stringField;
-	}
-
-}
-
-


Mime
View raw message