flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/4] flink git commit: [FLINK-1771] Add support for submitting single jobs to a detached YARN session
Date Tue, 31 Mar 2015 08:49:25 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index 3cf081f..d6b9444 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index f6885c5..3af64fc 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 230cc6b..25643a4 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 1fe16bb..3a24ce1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Visitor;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 40b54e0..65e5025 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 92b4fc5..1001626 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 5d15ed8..2e52565 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index 1e4124c..e81e0ec 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 80c0bda..321ca5a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 6e7c0a3..27f367f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityMap;
 import org.apache.flink.optimizer.util.IdentityReduce;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.types.LongValue;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index 0273659..b4e95fb 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -26,13 +26,13 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index 08f7388..d52181d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -29,13 +29,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 9fd676f..5758c86 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -28,12 +28,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index f865a9f..00fd587 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityPartitionerMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 360487b..0408ca9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -30,12 +30,12 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 8cd4809..74e5c8c 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -26,12 +26,12 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 779b8e5..72fb81b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -29,12 +29,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index eae40cf..8eedee1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -30,13 +30,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
index cb4bd78..5f69336 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -35,6 +34,7 @@ import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
index 5175d8c..13ec51a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
index 6b2691a..99f8c81 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
index b359e6b..ab83dba 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.fail;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 import org.apache.flink.api.common.Plan;
@@ -28,7 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
 
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index 2f9b32f..96758b1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index c0e2fa7..1bd4b8a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 57d2d54..796d4ab 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
 import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -36,6 +35,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
index 0a62132..14d863d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -27,10 +27,10 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Visitor;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
index cd63b72..3f18e62 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 8bb9a76..95ee4de 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index e1b18f9..4197abb 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index f1c2233..46eb48a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -33,11 +33,11 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index e7807c9..fb7a80f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -25,10 +25,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 9171cc7..8a4786f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
new file mode 100644
index 0000000..35c50d3
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Visitor;
+import org.junit.Before;
+
+/**
+ * Base class for Optimizer tests. Offers utility methods to trigger optimization
+ * of a program and to fetch the nodes in an optimizer plan that correspond
+ * the the node in the program plan.
+ */
+public abstract class CompilerTestBase implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
+
+	protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
+
+	protected static final int DEFAULT_PARALLELISM = 8;
+
+	protected static final String DEFAULT_PARALLELISM_STRING = String.valueOf(DEFAULT_PARALLELISM);
+
+	private static final String CACHE_KEY = "cachekey";
+
+	// ------------------------------------------------------------------------
+
+	protected transient DataStatistics dataStats;
+
+	protected transient Optimizer withStatsCompiler;
+
+	protected transient Optimizer noStatsCompiler;
+
+	private transient int statCounter;
+
+	// ------------------------------------------------------------------------
+
+	@Before
+	public void setup() {
+		Configuration flinkConf = new Configuration();
+		this.dataStats = new DataStatistics();
+		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator(), flinkConf);
+		this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
+
+		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator(), flinkConf);
+		this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public OptimizedPlan compileWithStats(Plan p) {
+		return this.withStatsCompiler.compile(p);
+	}
+
+	public OptimizedPlan compileNoStats(Plan p) {
+		return this.noStatsCompiler.compile(p);
+	}
+
+	public static OperatorResolver getContractResolver(Plan plan) {
+		return new OperatorResolver(plan);
+	}
+
+	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
+		setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
+	}
+
+	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
+		final String key = CACHE_KEY + this.statCounter++;
+		this.dataStats.cacheBaseStatistics(stats, key);
+		source.setStatisticsKey(key);
+	}
+
+
+	public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
+		return new OptimizerPlanNodeResolver(plan);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static final class OptimizerPlanNodeResolver {
+
+		private final Map<String, ArrayList<PlanNode>> map;
+
+		public OptimizerPlanNodeResolver(OptimizedPlan p) {
+			HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
+
+			for (PlanNode n : p.getAllNodes()) {
+				Operator<?> c = n.getOriginalOptimizerNode().getOperator();
+				String name = c.getName();
+
+				ArrayList<PlanNode> list = map.get(name);
+				if (list == null) {
+					list = new ArrayList<PlanNode>(2);
+					map.put(name, list);
+				}
+
+				// check whether this node is a child of a node with the same contract (aka combiner)
+				boolean shouldAdd = true;
+				for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
+					PlanNode in = iter.next();
+					if (in.getOriginalOptimizerNode().getOperator() == c) {
+						// is this the child or is our node the child
+						if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
+							SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
+							SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
+
+							if (thisNode.getPredecessor() == otherNode) {
+								// other node is child, remove it
+								iter.remove();
+							} else if (otherNode.getPredecessor() == thisNode) {
+								shouldAdd = false;
+							}
+						} else {
+							throw new RuntimeException("Unrecodnized case in test.");
+						}
+					}
+				}
+
+				if (shouldAdd) {
+					list.add(n);
+				}
+			}
+
+			this.map = map;
+		}
+
+
+		@SuppressWarnings("unchecked")
+		public <T extends PlanNode> T getNode(String name) {
+			List<PlanNode> nodes = this.map.get(name);
+			if (nodes == null || nodes.isEmpty()) {
+				throw new RuntimeException("No node found with the given name.");
+			} else if (nodes.size() != 1) {
+				throw new RuntimeException("Multiple nodes found with the given name.");
+			} else {
+				return (T) nodes.get(0);
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+			List<PlanNode> nodes = this.map.get(name);
+			if (nodes == null || nodes.isEmpty()) {
+				throw new RuntimeException("No node found with the given name and stub class.");
+			} else {
+				PlanNode found = null;
+				for (PlanNode node : nodes) {
+					if (node.getClass() == stubClass) {
+						if (found == null) {
+							found = node;
+						} else {
+							throw new RuntimeException("Multiple nodes found with the given name and stub class.");
+						}
+					}
+				}
+				if (found == null) {
+					throw new RuntimeException("No node found with the given name and stub class.");
+				} else {
+					return (T) found;
+				}
+			}
+		}
+
+		public List<PlanNode> getNodes(String name) {
+			List<PlanNode> nodes = this.map.get(name);
+			if (nodes == null || nodes.isEmpty()) {
+				throw new RuntimeException("No node found with the given name.");
+			} else {
+				return new ArrayList<PlanNode>(nodes);
+			}
+		}
+	}
+
+	/**
+	 * Collects all DataSources of a plan to add statistics
+	 *
+	 */
+	public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
+
+		protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
+
+		@Override
+		public boolean preVisit(Operator<?> visitable) {
+
+			if(visitable instanceof GenericDataSourceBase) {
+				sources.add((GenericDataSourceBase<?, ?>) visitable);
+			}
+			else if(visitable instanceof BulkIterationBase) {
+				((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
+			}
+
+			return true;
+		}
+
+		@Override
+		public void postVisit(Operator<?> visitable) {}
+
+		public List<GenericDataSourceBase<?, ?>> getSources() {
+			return this.sources;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
new file mode 100644
index 0000000..920b713
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.operators.BulkIteration;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Utility to get operator instances from plans via name.
+ */
+@SuppressWarnings("deprecation")
+public class OperatorResolver implements Visitor<Operator<?>> {
+	
+	private final Map<String, List<Operator<?>>> map;
+	private Set<Operator<?>> seen;
+	
+	public OperatorResolver(Plan p) {
+		this.map = new HashMap<String, List<Operator<?>>>();
+		this.seen = new HashSet<Operator<?>>();
+		
+		p.accept(this);
+		this.seen = null;
+	}
+	
+	
+	@SuppressWarnings("unchecked")
+	public <T extends Operator<?>> T getNode(String name) {
+		List<Operator<?>> nodes = this.map.get(name);
+		if (nodes == null || nodes.isEmpty()) {
+			throw new RuntimeException("No nodes found with the given name.");
+		} else if (nodes.size() != 1) {
+			throw new RuntimeException("Multiple nodes found with the given name.");
+		} else {
+			return (T) nodes.get(0);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
+		List<Operator<?>> nodes = this.map.get(name);
+		if (nodes == null || nodes.isEmpty()) {
+			throw new RuntimeException("No node found with the given name and stub class.");
+		} else {
+			Operator<?> found = null;
+			for (Operator<?> node : nodes) {
+				if (node.getClass() == stubClass) {
+					if (found == null) {
+						found = node;
+					} else {
+						throw new RuntimeException("Multiple nodes found with the given name and stub class.");
+					}
+				}
+			}
+			if (found == null) {
+				throw new RuntimeException("No node found with the given name and stub class.");
+			} else {
+				return (T) found;
+			}
+		}
+	}
+	
+	public List<Operator<?>> getNodes(String name) {
+		List<Operator<?>> nodes = this.map.get(name);
+		if (nodes == null || nodes.isEmpty()) {
+			throw new RuntimeException("No node found with the given name.");
+		} else {
+			return new ArrayList<Operator<?>>(nodes);
+		}
+	}
+
+	@Override
+	public boolean preVisit(Operator<?> visitable) {
+		if (this.seen.add(visitable)) {
+			// add to  the map
+			final String name = visitable.getName();
+			List<Operator<?>> list = this.map.get(name);
+			if (list == null) {
+				list = new ArrayList<Operator<?>>(2);
+				this.map.put(name, list);
+			}
+			list.add(visitable);
+			
+			// recurse into bulk iterations
+			if (visitable instanceof BulkIteration) {
+				((BulkIteration) visitable).getNextPartialSolution().accept(this);
+			} else if (visitable instanceof DeltaIteration) {
+				((DeltaIteration) visitable).getSolutionSetDelta().accept(this);
+				((DeltaIteration) visitable).getNextWorkset().accept(this);
+			}
+			
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public void postVisit(Operator<?> visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 78b99ad..4f3b4b2 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -59,7 +59,7 @@ under the License.
 				<artifactId>maven-archetype-plugin</artifactId>
 				<version>2.2</version><!--$NO-MVN-MAN-VER$-->
 				<configuration>
-					<skip>${skipTests}</skip>
+					<skip>true</skip>
 				</configuration>
 			</plugin>
 			<!-- deactivate the shade plugin for the quickstart archetypes -->

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
index fd91d65..f345d6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index cb799c4..1c76e08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 78e9707..626d21f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index b2f11db..50b1f24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -29,7 +29,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.security.MessageDigest;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 5db5ef6..69687da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.slf4j.Logger;
 
 import java.io.EOFException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
index 9f72db0..842870d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * An exception which is thrown by the JobClient if a job is aborted as a result of a user

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 99c3f89..56ccef5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This exception is the base exception for all exceptions that denote any failure during

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
index d2c81a5..c33ed8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
index 3d672a5..3cb0b9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This exception denotes an error while submitting a job to the JobManager

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
index 10ef601..18c8c31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * An exception which is thrown by the JobClient if the job manager is no longer reachable.

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index f7518bd..5d96903 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 556bb11..503a0b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index c6270b2..3fb7493 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index d394e2d..848f619 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -35,7 +35,7 @@ import java.util.TimerTask;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 532107f..66bda45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 63d85b4..52a8048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import java.io.File;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d0615b3..5ce89b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ad72d13..acbc17a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8dd341f..1b91089 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b838aa4..3449100 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.IOUtils;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index e27b7ea..324629f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -27,7 +27,7 @@ import java.util.Set;
 
 import akka.actor.ActorRef;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index eab4344..7bd70a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index c89b7f5..56be9d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 08d6441..bf8464c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d490784..55b89b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 92e27d3..88020a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 0ea3a1c..6e84b4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+
+import org.apache.flink.api.common.JobID;
 
 public interface ResultPartitionConsumableNotifier {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f8f22b0..d43533b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java
deleted file mode 100644
index 7c8d365..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobgraph;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.apache.flink.util.AbstractID;
-
-import java.nio.ByteBuffer;
-
-public final class JobID extends AbstractID {
-
-	private static final long serialVersionUID = 1L;
-	
-	public JobID() {
-		super();
-	}
-
-	public JobID(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public JobID(byte[] bytes) {
-		super(bytes);
-	}
-
-	public static JobID generate() {
-		return new JobID();
-	}
-
-	public static JobID fromByteArray(byte[] bytes) {
-		return new JobID(bytes);
-	}
-
-	public static JobID fromByteBuffer(ByteBuffer buf) {
-		long lower = buf.getLong();
-		long upper = buf.getLong();
-		return new JobID(lower, upper);
-	}
-
-	public static JobID fromHexString(String hexString) {
-		return new JobID(DatatypeConverter.parseHexBinary(hexString));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
index 3130354..5bf8ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
@@ -24,7 +24,7 @@ import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This class manages the accumulators for different jobs. Either the jobs are

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index ffb0dd4..008be65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -56,7 +56,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.util.EnvironmentInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
index 8f16e4f..4c4a20e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
@@ -20,7 +20,7 @@
 package org.apache.flink.runtime.profiling;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This interface must be implemented by profiling components

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
index 85dedb7..d5ce137 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
 


Mime
View raw message