flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [46/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename Pact* and Nephele* classes
Date Fri, 20 Mar 2015 10:07:25 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
index 5a0a6fc..bf2bbae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -77,7 +77,7 @@ public class IterativeKMeansTest extends CompilerTestBase {
 		OptimizedPlan plan = compileWithStats(p);
 		checkPlan(plan);
 		
-		new NepheleJobGraphGenerator().compileJobGraph(plan);
+		new JobGraphGenerator().compileJobGraph(plan);
 	}
 
 	@Test
@@ -89,7 +89,7 @@ public class IterativeKMeansTest extends CompilerTestBase {
 		OptimizedPlan plan = compileNoStats(p);
 		checkPlan(plan);
 		
-		new NepheleJobGraphGenerator().compileJobGraph(plan);
+		new JobGraphGenerator().compileJobGraph(plan);
 	}
 	
 	private void checkPlan(OptimizedPlan plan) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index cc8f788..e6a1e69 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 
 @SuppressWarnings("serial")
 public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
@@ -79,7 +79,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase
{
 			assertEquals(SolutionSetPlanNode.class, join1.getInput1().getSource().getClass());
 			assertEquals(SolutionSetPlanNode.class, join2.getInput2().getSource().getClass());
 			
-			new NepheleJobGraphGenerator().compileJobGraph(optPlan);
+			new JobGraphGenerator().compileJobGraph(optPlan);
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 731e344..f05bd25 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
 import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -68,7 +68,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
 			IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
 			
 			Configuration cfg = new Configuration();
-			cfg.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+			cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
 			
 			DataSet<Tuple2<Long, Double>> newRanks = iteration
 					// join pages with outgoing edges and distribute rank

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
index 1cf92e6..b348333 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.compiler.plandump;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
@@ -89,7 +89,7 @@ public class PreviewPlanDumpTest {
 	
 	private void dump(Plan p) {
 		try {
-			List<DataSinkNode> sinks = PactCompiler.createPreOptimizedPlan(p);
+			List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p);
 			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
 			String json = dumper.getPactPlanAsJSON(sinks);
 			JsonParser parser = new JsonFactory().createJsonParser(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
index b88eb4e..41b84e6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
@@ -27,11 +27,6 @@ public class WordCountITCase extends JavaProgramTestBase {
 	protected String textPath;
 	protected String resultPath;
 
-	public WordCountITCase(){
-		setDegreeOfParallelism(4);
-		setNumTaskManagers(2);
-		setTaskManagerNumSlots(2);
-	}
 
 	@Override
 	protected void preSubmit() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index a5c2bb4..9021c6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
@@ -88,11 +88,11 @@ public class TaskFailureITCase extends FailingTestBase {
 		plan.setDefaultParallelism(DOP);
 
 		// optimize and compile plan 
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		OptimizedPlan op = pc.compile(plan);
 		
 		// return job graph of failing job
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
 
@@ -118,11 +118,11 @@ public class TaskFailureITCase extends FailingTestBase {
 		plan.setDefaultParallelism(4);
 
 		// optimize and compile plan
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		OptimizedPlan op = pc.compile(plan);
 
 		// return job graph of working job
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 037610e..a0e3468 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -39,12 +39,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 	protected String textPath;
 	protected String resultPath;
 
-	public WordCountMapredITCase(){
-//		setDegreeOfParallelism(4);
-//		setNumTaskManagers(2);
-//		setTaskManagerNumSlots(2);
-	}
-
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
@@ -59,7 +53,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-//		env.setDegreeOfParallelism(1);
 
 
 		DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(),

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index 3bdaa22..fee49bf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -39,12 +39,6 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
 	protected String textPath;
 	protected String resultPath;
 
-	public WordCountMapreduceITCase(){
-//		setDegreeOfParallelism(4);
-//		setNumTaskManagers(2);
-//		setTaskManagerNumSlots(2);
-	}
-
 	@Override
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 79d49aa..e5f91b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested;
@@ -377,7 +377,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				.map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
 
 		Configuration cfg = new Configuration();
-		cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
+		cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);
 		DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
 				.withParameters(cfg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index d92897d..220611d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.api.java.record.operators.CoGroupOperator;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.RecordAPITestBase;
 import org.apache.flink.types.IntValue;
@@ -156,9 +156,9 @@ public class CoGroupITCase extends RecordAPITestBase {
 		CoGroupOperator testCoGrouper = CoGroupOperator.builder(new TestCoGrouper(), StringValue.class,
0, 0)
 			.build();
 		testCoGrouper.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
-		testCoGrouper.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+		testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
 				config.getString("CoGroupTest#LocalStrategy", ""));
-		testCoGrouper.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+		testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
 				config.getString("CoGroupTest#ShipStrategy", ""));
 
 		FileDataSink output = new FileDataSink(new CoGroupOutFormat(), resultPath);
@@ -181,9 +181,9 @@ public class CoGroupITCase extends RecordAPITestBase {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 
-		String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
+		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
 
-		String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
+		String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
 
 		for (String localStrategy : localStrategies) {
 			for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index f72d146..f6b4127 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.api.java.record.operators.CrossOperator;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
@@ -132,20 +132,20 @@ public class CrossITCase extends RecordAPITestBase {
 
 		CrossOperator testCross = CrossOperator.builder(new TestCross()).build();
 		testCross.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-		testCross.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+		testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
 				config.getString("CrossTest#LocalStrategy", ""));
 		if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
-			testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
-			testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
+			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
+			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
 		} else if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_SECOND")) {
-			testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
-			testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
+			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
+			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
 		} else {
-			testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
 					config.getString("CrossTest#ShipStrategy", ""));
 		}
 
@@ -170,10 +170,10 @@ public class CrossITCase extends RecordAPITestBase {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 
-		String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST,
-				PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND,
-				PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST,
-				PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND };
+		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST,
+				Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND,
+				Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST,
+				Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND };
 
 		String[] shipStrategies = { "BROADCAST_FIRST", "BROADCAST_SECOND"
 		// PactCompiler.HINT_SHIP_STRATEGY_BROADCAST

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
index 6906d36..02a6e38 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
@@ -122,20 +122,20 @@ public class JoinITCase extends RecordAPITestBase {
 		JoinOperator testMatcher = JoinOperator.builder(new TestMatcher(), StringValue.class, 0,
0)
 			.build();
 		testMatcher.setDegreeOfParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
-		testMatcher.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+		testMatcher.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
 				config.getString("MatchTest#LocalStrategy", ""));
 		if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
-			testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
-			testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
+			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
+			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
 		} else if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_SECOND")) {
-			testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
-			testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
+			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
+			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
 		} else {
-			testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
 					config.getString("MatchTest#ShipStrategy", ""));
 		}
 
@@ -160,10 +160,10 @@ public class JoinITCase extends RecordAPITestBase {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 
-		String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE,
-				PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND
};
+		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE,
+				Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND
};
 
-		String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST",
"BROADCAST_SECOND"};
+		String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST",
"BROADCAST_SECOND"};
 
 		for (String localStrategy : localStrategies) {
 			for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index 5f5f33d..3c8d372 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
@@ -119,9 +119,9 @@ public class ReduceITCase extends RecordAPITestBase {
 		ReduceOperator testReducer = ReduceOperator.builder(new TestReducer(), StringValue.class,
0)
 			.build();
 		testReducer.setDegreeOfParallelism(config.getInteger("ReduceTest#NoSubtasks", 1));
-		testReducer.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+		testReducer.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
 				config.getString("ReduceTest#LocalStrategy", ""));
-		testReducer.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+		testReducer.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
 				config.getString("ReduceTest#ShipStrategy", ""));
 
 		FileDataSink output = new FileDataSink(
@@ -133,10 +133,10 @@ public class ReduceITCase extends RecordAPITestBase {
 
 		Plan plan = new Plan(output);
 
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		OptimizedPlan op = pc.compile(plan);
 
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 
 	}
@@ -151,8 +151,8 @@ public class ReduceITCase extends RecordAPITestBase {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 
-		String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT };
-		String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
+		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT };
+		String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
 
 		for (String localStrategy : localStrategies) {
 			for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
index 4b9bc2a..944de98 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
@@ -28,9 +28,9 @@ import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
@@ -124,10 +124,10 @@ public class UnionSinkITCase extends RecordAPITestBase {
 		Plan plan = new Plan(output);
 		plan.setDefaultParallelism(DOP);
 
-		PactCompiler pc = new PactCompiler(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics());
 		OptimizedPlan op = pc.compile(plan);
 
-		NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+		JobGraphGenerator jgg = new JobGraphGenerator();
 		return jgg.compileJobGraph(op);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 853a477..33e0807 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.{CrazyNested, POJO, MutableTuple3,
 CustomType}
-import org.apache.flink.optimizer.PactCompiler
+import org.apache.flink.optimizer.Optimizer
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -324,7 +324,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
     val ds =  CollectionDataSets.get3TupleDataSet(env).map(t => t).setParallelism(4)
 
     val cfg: Configuration = new Configuration
-    cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)
+    cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION)
 
     val reduceDs =  ds.reduceGroup(new Tuple3AllGroupReduceWithCombine).withParameters(cfg)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index 8ac52cb..969f970 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -54,17 +54,17 @@ class CustomPartitioningTest extends CompilerTestBase {
       val balancer = partitioner.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
-      assertEquals(parallelism, sink.getDegreeOfParallelism)
+      assertEquals(parallelism, sink.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy)
-      assertEquals(parallelism, mapper.getDegreeOfParallelism)
+      assertEquals(parallelism, mapper.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy)
       assertEquals(part, partitioner.getInput.getPartitioner)
-      assertEquals(parallelism, partitioner.getDegreeOfParallelism)
+      assertEquals(parallelism, partitioner.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput.getShipStrategy)
-      assertEquals(parallelism, balancer.getDegreeOfParallelism)
+      assertEquals(parallelism, balancer.getParallelism)
     }
     catch {
       case e: Exception => {
@@ -124,17 +124,17 @@ class CustomPartitioningTest extends CompilerTestBase {
       val balancer = partitioner.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
-      assertEquals(parallelism, sink.getDegreeOfParallelism)
+      assertEquals(parallelism, sink.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy)
-      assertEquals(parallelism, mapper.getDegreeOfParallelism)
+      assertEquals(parallelism, mapper.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy)
       assertEquals(part, partitioner.getInput.getPartitioner)
-      assertEquals(parallelism, partitioner.getDegreeOfParallelism)
+      assertEquals(parallelism, partitioner.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput.getShipStrategy)
-      assertEquals(parallelism, balancer.getDegreeOfParallelism)
+      assertEquals(parallelism, balancer.getParallelism)
     }
     catch {
       case e: Exception => {
@@ -197,23 +197,23 @@ class CustomPartitioningTest extends CompilerTestBase {
       val balancer = keyExtractor.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
-      assertEquals(parallelism, sink.getDegreeOfParallelism)
+      assertEquals(parallelism, sink.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy)
-      assertEquals(parallelism, mapper.getDegreeOfParallelism)
+      assertEquals(parallelism, mapper.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput.getShipStrategy)
-      assertEquals(parallelism, keyRemover.getDegreeOfParallelism)
+      assertEquals(parallelism, keyRemover.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy)
       assertEquals(part, partitioner.getInput.getPartitioner)
-      assertEquals(parallelism, partitioner.getDegreeOfParallelism)
+      assertEquals(parallelism, partitioner.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput.getShipStrategy)
-      assertEquals(parallelism, keyExtractor.getDegreeOfParallelism)
+      assertEquals(parallelism, keyExtractor.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput.getShipStrategy)
-      assertEquals(parallelism, balancer.getDegreeOfParallelism)
+      assertEquals(parallelism, balancer.getParallelism)
     }
     catch {
       case e: Exception => {


Mime
View raw message