flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [40/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:07:19 GMT
[optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/633b0d6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/633b0d6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/633b0d6a

Branch: refs/heads/master
Commit: 633b0d6a9b25fd324f1aa007bbce246be304c099
Parents: 5757850
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Mar 17 21:44:28 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Mar 20 10:21:14 2015 +0100

----------------------------------------------------------------------
 flink-clients/pom.xml                           |    2 +-
 flink-compiler/pom.xml                          |   63 -
 .../flink/optimizer/CompilerException.java      |   64 -
 .../optimizer/CompilerPostPassException.java    |   62 -
 .../apache/flink/optimizer/DataStatistics.java  |   69 -
 .../org/apache/flink/optimizer/Optimizer.java   |  571 -------
 .../flink/optimizer/costs/CostEstimator.java    |  231 ---
 .../org/apache/flink/optimizer/costs/Costs.java |  492 ------
 .../optimizer/costs/DefaultCostEstimator.java   |  265 ---
 .../dag/AbstractPartialSolutionNode.java        |  104 --
 .../flink/optimizer/dag/BinaryUnionNode.java    |  308 ----
 .../flink/optimizer/dag/BulkIterationNode.java  |  390 -----
 .../optimizer/dag/BulkPartialSolutionNode.java  |  103 --
 .../apache/flink/optimizer/dag/CoGroupNode.java |  106 --
 .../flink/optimizer/dag/CollectorMapNode.java   |   62 -
 .../apache/flink/optimizer/dag/CrossNode.java   |  138 --
 .../flink/optimizer/dag/DagConnection.java      |  290 ----
 .../flink/optimizer/dag/DataSinkNode.java       |  266 ---
 .../flink/optimizer/dag/DataSourceNode.java     |  306 ----
 .../flink/optimizer/dag/EstimateProvider.java   |   47 -
 .../apache/flink/optimizer/dag/FilterNode.java  |   72 -
 .../apache/flink/optimizer/dag/FlatMapNode.java |   65 -
 .../flink/optimizer/dag/GroupCombineNode.java   |  100 --
 .../flink/optimizer/dag/GroupReduceNode.java    |  158 --
 .../dag/InterestingPropertiesClearer.java       |   39 -
 .../flink/optimizer/dag/IterationNode.java      |   30 -
 .../apache/flink/optimizer/dag/JoinNode.java    |  187 ---
 .../org/apache/flink/optimizer/dag/MapNode.java |   66 -
 .../flink/optimizer/dag/MapPartitionNode.java   |   67 -
 .../apache/flink/optimizer/dag/MatchNode.java   |  167 --
 .../apache/flink/optimizer/dag/NoOpNode.java    |   44 -
 .../flink/optimizer/dag/OptimizerNode.java      | 1172 -------------
 .../flink/optimizer/dag/PartitionNode.java      |  148 --
 .../flink/optimizer/dag/PlanCacheCleaner.java   |   39 -
 .../apache/flink/optimizer/dag/ReduceNode.java  |   98 --
 .../flink/optimizer/dag/SingleInputNode.java    |  518 ------
 .../apache/flink/optimizer/dag/SinkJoiner.java  |  112 --
 .../flink/optimizer/dag/SolutionSetNode.java    |   99 --
 .../flink/optimizer/dag/SortPartitionNode.java  |  127 --
 .../apache/flink/optimizer/dag/TempMode.java    |   83 -
 .../flink/optimizer/dag/TwoInputNode.java       |  747 ---------
 .../flink/optimizer/dag/UnaryOperatorNode.java  |   69 -
 .../optimizer/dag/WorksetIterationNode.java     |  589 -------
 .../apache/flink/optimizer/dag/WorksetNode.java |  104 --
 .../dataproperties/GlobalProperties.java        |  500 ------
 .../dataproperties/InterestingProperties.java   |  179 --
 .../dataproperties/LocalProperties.java         |  307 ----
 .../dataproperties/PartitioningProperty.java    |  112 --
 .../RequestedGlobalProperties.java              |  486 ------
 .../RequestedLocalProperties.java               |  265 ---
 .../optimizer/deadlockdetect/DeadlockEdge.java  |   38 -
 .../optimizer/deadlockdetect/DeadlockGraph.java |  133 --
 .../deadlockdetect/DeadlockPreventer.java       |  211 ---
 .../deadlockdetect/DeadlockVertex.java          |   98 --
 .../operators/AbstractJoinDescriptor.java       |  185 --
 .../operators/AbstractOperatorDescriptor.java   |   35 -
 .../operators/AllGroupCombineProperties.java    |   73 -
 .../operators/AllGroupReduceProperties.java     |   73 -
 .../AllGroupWithPartialPreGroupProperties.java  |  100 --
 .../operators/AllReduceProperties.java          |   94 --
 .../operators/BinaryUnionOpDescriptor.java      |   97 --
 .../operators/CartesianProductDescriptor.java   |  110 --
 .../optimizer/operators/CoGroupDescriptor.java  |  239 ---
 .../CoGroupWithSolutionSetFirstDescriptor.java  |   77 -
 .../CoGroupWithSolutionSetSecondDescriptor.java |   57 -
 .../operators/CollectorMapDescriptor.java       |   75 -
 .../CrossBlockOuterFirstDescriptor.java         |   44 -
 .../CrossBlockOuterSecondDescriptor.java        |   44 -
 .../CrossStreamOuterFirstDescriptor.java        |   51 -
 .../CrossStreamOuterSecondDescriptor.java       |   51 -
 .../optimizer/operators/FilterDescriptor.java   |   68 -
 .../optimizer/operators/FlatMapDescriptor.java  |   75 -
 .../operators/GroupCombineProperties.java       |  117 --
 .../operators/GroupReduceProperties.java        |  129 --
 .../GroupReduceWithCombineProperties.java       |  169 --
 .../operators/HashJoinBuildFirstProperties.java |   89 -
 .../HashJoinBuildSecondProperties.java          |   88 -
 .../optimizer/operators/MapDescriptor.java      |   68 -
 .../operators/MapPartitionDescriptor.java       |   68 -
 .../optimizer/operators/NoOpDescriptor.java     |   70 -
 .../operators/OperatorDescriptorDual.java       |  206 ---
 .../operators/OperatorDescriptorSingle.java     |  106 --
 .../operators/PartialGroupProperties.java       |   91 -
 .../optimizer/operators/ReduceProperties.java   |  126 --
 .../operators/SolutionSetDeltaOperator.java     |   75 -
 .../operators/SortMergeJoinDescriptor.java      |  110 --
 .../operators/UtilSinkJoinOpDescriptor.java     |   91 -
 .../optimizer/plan/BinaryUnionPlanNode.java     |   60 -
 .../optimizer/plan/BulkIterationPlanNode.java   |  168 --
 .../plan/BulkPartialSolutionPlanNode.java       |  127 --
 .../apache/flink/optimizer/plan/Channel.java    |  538 ------
 .../flink/optimizer/plan/DualInputPlanNode.java |  246 ---
 .../apache/flink/optimizer/plan/FlinkPlan.java  |   28 -
 .../flink/optimizer/plan/IterationPlanNode.java |   32 -
 .../flink/optimizer/plan/NAryUnionPlanNode.java |  106 --
 .../flink/optimizer/plan/NamedChannel.java      |   46 -
 .../flink/optimizer/plan/OptimizedPlan.java     |  130 --
 .../apache/flink/optimizer/plan/PlanNode.java   |  573 -------
 .../optimizer/plan/SingleInputPlanNode.java     |  271 ---
 .../optimizer/plan/SinkJoinerPlanNode.java      |   73 -
 .../flink/optimizer/plan/SinkPlanNode.java      |   50 -
 .../optimizer/plan/SolutionSetPlanNode.java     |  124 --
 .../flink/optimizer/plan/SourcePlanNode.java    |  113 --
 .../flink/optimizer/plan/StreamingPlan.java     |   38 -
 .../plan/WorksetIterationPlanNode.java          |  259 ---
 .../flink/optimizer/plan/WorksetPlanNode.java   |  131 --
 .../optimizer/plandump/DumpableConnection.java  |   32 -
 .../flink/optimizer/plandump/DumpableNode.java  |   41 -
 .../plandump/PlanJSONDumpGenerator.java         |  657 --------
 .../plantranslate/JobGraphGenerator.java        | 1578 ------------------
 .../optimizer/postpass/AbstractSchema.java      |   39 -
 .../ConflictingFieldTypeInfoException.java      |   50 -
 .../postpass/GenericFlatTypePostPass.java       |  579 -------
 .../optimizer/postpass/JavaApiPostPass.java     |  327 ----
 .../postpass/MissingFieldTypeInfoException.java |   34 -
 .../optimizer/postpass/OptimizerPostPass.java   |   37 -
 .../flink/optimizer/postpass/PostPassUtils.java |   47 -
 .../optimizer/postpass/RecordModelPostPass.java |  174 --
 .../optimizer/postpass/SparseKeySchema.java     |   86 -
 .../traversals/BinaryUnionReplacer.java         |  125 --
 .../optimizer/traversals/BranchesVisitor.java   |   46 -
 .../traversals/GraphCreatingVisitor.java        |  392 -----
 .../traversals/IdAndEstimatesVisitor.java       |   68 -
 .../traversals/InterestingPropertyVisitor.java  |   60 -
 .../optimizer/traversals/PlanFinalizer.java     |  229 ---
 .../traversals/StaticDynamicPathIdentifier.java |   58 -
 .../traversals/StepFunctionValidator.java       |   53 -
 .../optimizer/traversals/package-info.java      |   27 -
 .../flink/optimizer/util/NoOpBinaryUdfOp.java   |   51 -
 .../flink/optimizer/util/NoOpUnaryUdfOp.java    |   62 -
 .../org/apache/flink/optimizer/util/Utils.java  |   81 -
 .../optimizer/AdditionalOperatorsTest.java      |  110 --
 .../optimizer/BranchingPlansCompilerTest.java   | 1039 ------------
 .../BroadcastVariablePipelinebreakerTest.java   |   83 -
 .../CachedMatchStrategyCompilerTest.java        |  268 ---
 .../optimizer/CoGroupSolutionSetFirstTest.java  |  101 --
 .../flink/optimizer/CompilerTestBase.java       |  229 ---
 .../apache/flink/optimizer/DOPChangeTest.java   |  347 ----
 .../flink/optimizer/DisjointDataFlowsTest.java  |   51 -
 .../optimizer/DistinctCompilationTest.java      |  206 ---
 .../optimizer/FeedbackPropertiesMatchTest.java  | 1436 ----------------
 .../apache/flink/optimizer/GroupOrderTest.java  |  167 --
 .../optimizer/HardPlansCompilationTest.java     |   80 -
 .../flink/optimizer/IterationsCompilerTest.java |  409 -----
 .../flink/optimizer/NestedIterationsTest.java   |  181 --
 .../flink/optimizer/PartitionPushdownTest.java  |  104 --
 .../optimizer/PartitioningReusageTest.java      |  845 ----------
 .../flink/optimizer/PipelineBreakerTest.java    |  241 ---
 .../flink/optimizer/PropertyDataSourceTest.java |  897 ----------
 .../apache/flink/optimizer/ReduceAllTest.java   |   61 -
 .../optimizer/ReplicatingDataSourceTest.java    |  495 ------
 .../SemanticPropertiesAPIToPlanTest.java        |  173 --
 .../flink/optimizer/SortPartialReuseTest.java   |  130 --
 .../UnionBetweenDynamicAndStaticPathTest.java   |  143 --
 .../optimizer/UnionPropertyPropagationTest.java |  186 ---
 .../flink/optimizer/UnionReplacementTest.java   |   55 -
 .../WorksetIterationCornerCasesTest.java        |   77 -
 .../WorksetIterationsRecordApiCompilerTest.java |  247 ---
 .../costs/DefaultCostEstimatorTest.java         |  428 -----
 ...naryCustomPartitioningCompatibilityTest.java |  130 --
 .../CoGroupCustomPartitioningTest.java          |  312 ----
 ...ustomPartitioningGlobalOptimizationTest.java |   95 --
 .../custompartition/CustomPartitioningTest.java |  287 ----
 .../GroupingKeySelectorTranslationTest.java     |  234 ---
 .../GroupingPojoTranslationTest.java            |  257 ---
 .../GroupingTupleTranslationTest.java           |  270 ---
 .../JoinCustomPartitioningTest.java             |  309 ----
 .../DataExchangeModeClosedBranchingTest.java    |  257 ---
 .../DataExchangeModeForwardTest.java            |  139 --
 .../DataExchangeModeOpenBranchingTest.java      |  182 --
 .../dataexchange/PipelineBreakingTest.java      |  322 ----
 .../GlobalPropertiesFilteringTest.java          |  428 -----
 .../GlobalPropertiesMatchingTest.java           |  294 ----
 .../GlobalPropertiesPushdownTest.java           |  107 --
 .../LocalPropertiesFilteringTest.java           |  373 -----
 .../dataproperties/MockDistribution.java        |   50 -
 .../dataproperties/MockPartitioner.java         |   32 -
 .../RequestedGlobalPropertiesFilteringTest.java |  430 -----
 .../RequestedLocalPropertiesFilteringTest.java  |  246 ---
 .../java/DeltaIterationDependenciesTest.java    |   76 -
 .../java/DistinctAndGroupingOptimizerTest.java  |  112 --
 .../java/GroupReduceCompilationTest.java        |  368 ----
 .../optimizer/java/IterationCompilerTest.java   |  189 ---
 .../optimizer/java/JoinTranslationTest.java     |  168 --
 .../flink/optimizer/java/OpenIterationTest.java |  183 --
 .../optimizer/java/PartitionOperatorTest.java   |   70 -
 .../optimizer/java/ReduceCompilationTest.java   |  261 ---
 .../WorksetIterationsJavaApiCompilerTest.java   |  302 ----
 ...oGroupGlobalPropertiesCompatibilityTest.java |  161 --
 .../CoGroupOnConflictingPartitioningsTest.java  |   67 -
 .../JoinGlobalPropertiesCompatibilityTest.java  |  161 --
 .../JoinOnConflictingPartitioningsTest.java     |   65 -
 .../flink/optimizer/plan/ChannelTest.java       |   90 -
 .../plandump/NumberFormattingTest.java          |   53 -
 .../testfunctions/DummyCoGroupFunction.java     |   31 -
 .../testfunctions/DummyFlatJoinFunction.java    |   33 -
 .../optimizer/testfunctions/DummyReducer.java   |   31 -
 .../testfunctions/IdentityFlatMapper.java       |   30 -
 .../testfunctions/IdentityGroupReducer.java     |   38 -
 .../testfunctions/IdentityKeyExtractor.java     |   31 -
 .../optimizer/testfunctions/IdentityMapper.java |   32 -
 .../IdentityPartitionerMapper.java              |   34 -
 .../testfunctions/SelectOneReducer.java         |   31 -
 .../testfunctions/Top1GroupReducer.java         |   35 -
 .../flink/optimizer/util/DummyCoGroupStub.java  |   42 -
 .../flink/optimizer/util/DummyCrossStub.java    |   32 -
 .../flink/optimizer/util/DummyInputFormat.java  |   42 -
 .../flink/optimizer/util/DummyMatchStub.java    |   37 -
 .../util/DummyNonPreservingMatchStub.java       |   35 -
 .../flink/optimizer/util/DummyOutputFormat.java |   34 -
 .../flink/optimizer/util/IdentityMap.java       |   37 -
 .../flink/optimizer/util/IdentityReduce.java    |   40 -
 .../src/test/resources/log4j-test.properties    |   19 -
 .../src/test/resources/log4j.properties         |   27 -
 .../src/test/resources/logback-test.xml         |   29 -
 flink-dist/pom.xml                              |    2 +-
 flink-optimizer/pom.xml                         |   63 +
 .../flink/optimizer/CompilerException.java      |   64 +
 .../optimizer/CompilerPostPassException.java    |   62 +
 .../apache/flink/optimizer/DataStatistics.java  |   69 +
 .../org/apache/flink/optimizer/Optimizer.java   |  571 +++++++
 .../flink/optimizer/costs/CostEstimator.java    |  231 +++
 .../org/apache/flink/optimizer/costs/Costs.java |  492 ++++++
 .../optimizer/costs/DefaultCostEstimator.java   |  265 +++
 .../dag/AbstractPartialSolutionNode.java        |  104 ++
 .../flink/optimizer/dag/BinaryUnionNode.java    |  308 ++++
 .../flink/optimizer/dag/BulkIterationNode.java  |  390 +++++
 .../optimizer/dag/BulkPartialSolutionNode.java  |  103 ++
 .../apache/flink/optimizer/dag/CoGroupNode.java |  106 ++
 .../flink/optimizer/dag/CollectorMapNode.java   |   62 +
 .../apache/flink/optimizer/dag/CrossNode.java   |  138 ++
 .../flink/optimizer/dag/DagConnection.java      |  290 ++++
 .../flink/optimizer/dag/DataSinkNode.java       |  266 +++
 .../flink/optimizer/dag/DataSourceNode.java     |  306 ++++
 .../flink/optimizer/dag/EstimateProvider.java   |   47 +
 .../apache/flink/optimizer/dag/FilterNode.java  |   72 +
 .../apache/flink/optimizer/dag/FlatMapNode.java |   65 +
 .../flink/optimizer/dag/GroupCombineNode.java   |  100 ++
 .../flink/optimizer/dag/GroupReduceNode.java    |  158 ++
 .../dag/InterestingPropertiesClearer.java       |   39 +
 .../flink/optimizer/dag/IterationNode.java      |   30 +
 .../apache/flink/optimizer/dag/JoinNode.java    |  187 +++
 .../org/apache/flink/optimizer/dag/MapNode.java |   66 +
 .../flink/optimizer/dag/MapPartitionNode.java   |   67 +
 .../apache/flink/optimizer/dag/MatchNode.java   |  167 ++
 .../apache/flink/optimizer/dag/NoOpNode.java    |   44 +
 .../flink/optimizer/dag/OptimizerNode.java      | 1172 +++++++++++++
 .../flink/optimizer/dag/PartitionNode.java      |  148 ++
 .../flink/optimizer/dag/PlanCacheCleaner.java   |   39 +
 .../apache/flink/optimizer/dag/ReduceNode.java  |   98 ++
 .../flink/optimizer/dag/SingleInputNode.java    |  518 ++++++
 .../apache/flink/optimizer/dag/SinkJoiner.java  |  112 ++
 .../flink/optimizer/dag/SolutionSetNode.java    |   99 ++
 .../flink/optimizer/dag/SortPartitionNode.java  |  127 ++
 .../apache/flink/optimizer/dag/TempMode.java    |   83 +
 .../flink/optimizer/dag/TwoInputNode.java       |  747 +++++++++
 .../flink/optimizer/dag/UnaryOperatorNode.java  |   69 +
 .../optimizer/dag/WorksetIterationNode.java     |  589 +++++++
 .../apache/flink/optimizer/dag/WorksetNode.java |  104 ++
 .../dataproperties/GlobalProperties.java        |  500 ++++++
 .../dataproperties/InterestingProperties.java   |  179 ++
 .../dataproperties/LocalProperties.java         |  307 ++++
 .../dataproperties/PartitioningProperty.java    |  112 ++
 .../RequestedGlobalProperties.java              |  486 ++++++
 .../RequestedLocalProperties.java               |  265 +++
 .../optimizer/deadlockdetect/DeadlockEdge.java  |   38 +
 .../optimizer/deadlockdetect/DeadlockGraph.java |  133 ++
 .../deadlockdetect/DeadlockPreventer.java       |  211 +++
 .../deadlockdetect/DeadlockVertex.java          |   98 ++
 .../operators/AbstractJoinDescriptor.java       |  185 ++
 .../operators/AbstractOperatorDescriptor.java   |   35 +
 .../operators/AllGroupCombineProperties.java    |   73 +
 .../operators/AllGroupReduceProperties.java     |   73 +
 .../AllGroupWithPartialPreGroupProperties.java  |  100 ++
 .../operators/AllReduceProperties.java          |   94 ++
 .../operators/BinaryUnionOpDescriptor.java      |   97 ++
 .../operators/CartesianProductDescriptor.java   |  110 ++
 .../optimizer/operators/CoGroupDescriptor.java  |  239 +++
 .../CoGroupWithSolutionSetFirstDescriptor.java  |   77 +
 .../CoGroupWithSolutionSetSecondDescriptor.java |   57 +
 .../operators/CollectorMapDescriptor.java       |   75 +
 .../CrossBlockOuterFirstDescriptor.java         |   44 +
 .../CrossBlockOuterSecondDescriptor.java        |   44 +
 .../CrossStreamOuterFirstDescriptor.java        |   51 +
 .../CrossStreamOuterSecondDescriptor.java       |   51 +
 .../optimizer/operators/FilterDescriptor.java   |   68 +
 .../optimizer/operators/FlatMapDescriptor.java  |   75 +
 .../operators/GroupCombineProperties.java       |  117 ++
 .../operators/GroupReduceProperties.java        |  129 ++
 .../GroupReduceWithCombineProperties.java       |  169 ++
 .../operators/HashJoinBuildFirstProperties.java |   89 +
 .../HashJoinBuildSecondProperties.java          |   88 +
 .../optimizer/operators/MapDescriptor.java      |   68 +
 .../operators/MapPartitionDescriptor.java       |   68 +
 .../optimizer/operators/NoOpDescriptor.java     |   70 +
 .../operators/OperatorDescriptorDual.java       |  206 +++
 .../operators/OperatorDescriptorSingle.java     |  106 ++
 .../operators/PartialGroupProperties.java       |   91 +
 .../optimizer/operators/ReduceProperties.java   |  126 ++
 .../operators/SolutionSetDeltaOperator.java     |   75 +
 .../operators/SortMergeJoinDescriptor.java      |  110 ++
 .../operators/UtilSinkJoinOpDescriptor.java     |   91 +
 .../optimizer/plan/BinaryUnionPlanNode.java     |   60 +
 .../optimizer/plan/BulkIterationPlanNode.java   |  168 ++
 .../plan/BulkPartialSolutionPlanNode.java       |  127 ++
 .../apache/flink/optimizer/plan/Channel.java    |  538 ++++++
 .../flink/optimizer/plan/DualInputPlanNode.java |  246 +++
 .../apache/flink/optimizer/plan/FlinkPlan.java  |   28 +
 .../flink/optimizer/plan/IterationPlanNode.java |   32 +
 .../flink/optimizer/plan/NAryUnionPlanNode.java |  106 ++
 .../flink/optimizer/plan/NamedChannel.java      |   46 +
 .../flink/optimizer/plan/OptimizedPlan.java     |  130 ++
 .../apache/flink/optimizer/plan/PlanNode.java   |  573 +++++++
 .../optimizer/plan/SingleInputPlanNode.java     |  271 +++
 .../optimizer/plan/SinkJoinerPlanNode.java      |   73 +
 .../flink/optimizer/plan/SinkPlanNode.java      |   50 +
 .../optimizer/plan/SolutionSetPlanNode.java     |  124 ++
 .../flink/optimizer/plan/SourcePlanNode.java    |  113 ++
 .../flink/optimizer/plan/StreamingPlan.java     |   38 +
 .../plan/WorksetIterationPlanNode.java          |  259 +++
 .../flink/optimizer/plan/WorksetPlanNode.java   |  131 ++
 .../optimizer/plandump/DumpableConnection.java  |   32 +
 .../flink/optimizer/plandump/DumpableNode.java  |   41 +
 .../plandump/PlanJSONDumpGenerator.java         |  657 ++++++++
 .../plantranslate/JobGraphGenerator.java        | 1578 ++++++++++++++++++
 .../optimizer/postpass/AbstractSchema.java      |   39 +
 .../ConflictingFieldTypeInfoException.java      |   50 +
 .../postpass/GenericFlatTypePostPass.java       |  579 +++++++
 .../optimizer/postpass/JavaApiPostPass.java     |  327 ++++
 .../postpass/MissingFieldTypeInfoException.java |   34 +
 .../optimizer/postpass/OptimizerPostPass.java   |   37 +
 .../flink/optimizer/postpass/PostPassUtils.java |   47 +
 .../optimizer/postpass/RecordModelPostPass.java |  174 ++
 .../optimizer/postpass/SparseKeySchema.java     |   86 +
 .../traversals/BinaryUnionReplacer.java         |  125 ++
 .../optimizer/traversals/BranchesVisitor.java   |   46 +
 .../traversals/GraphCreatingVisitor.java        |  392 +++++
 .../traversals/IdAndEstimatesVisitor.java       |   68 +
 .../traversals/InterestingPropertyVisitor.java  |   60 +
 .../optimizer/traversals/PlanFinalizer.java     |  229 +++
 .../traversals/StaticDynamicPathIdentifier.java |   58 +
 .../traversals/StepFunctionValidator.java       |   53 +
 .../optimizer/traversals/package-info.java      |   27 +
 .../flink/optimizer/util/NoOpBinaryUdfOp.java   |   51 +
 .../flink/optimizer/util/NoOpUnaryUdfOp.java    |   62 +
 .../org/apache/flink/optimizer/util/Utils.java  |   81 +
 .../optimizer/AdditionalOperatorsTest.java      |  110 ++
 .../optimizer/BranchingPlansCompilerTest.java   | 1039 ++++++++++++
 .../BroadcastVariablePipelinebreakerTest.java   |   83 +
 .../CachedMatchStrategyCompilerTest.java        |  268 +++
 .../optimizer/CoGroupSolutionSetFirstTest.java  |  101 ++
 .../flink/optimizer/CompilerTestBase.java       |  229 +++
 .../apache/flink/optimizer/DOPChangeTest.java   |  347 ++++
 .../flink/optimizer/DisjointDataFlowsTest.java  |   51 +
 .../optimizer/DistinctCompilationTest.java      |  206 +++
 .../optimizer/FeedbackPropertiesMatchTest.java  | 1436 ++++++++++++++++
 .../apache/flink/optimizer/GroupOrderTest.java  |  167 ++
 .../optimizer/HardPlansCompilationTest.java     |   80 +
 .../flink/optimizer/IterationsCompilerTest.java |  409 +++++
 .../flink/optimizer/NestedIterationsTest.java   |  181 ++
 .../flink/optimizer/PartitionPushdownTest.java  |  104 ++
 .../optimizer/PartitioningReusageTest.java      |  845 ++++++++++
 .../flink/optimizer/PipelineBreakerTest.java    |  241 +++
 .../flink/optimizer/PropertyDataSourceTest.java |  897 ++++++++++
 .../apache/flink/optimizer/ReduceAllTest.java   |   61 +
 .../optimizer/ReplicatingDataSourceTest.java    |  495 ++++++
 .../SemanticPropertiesAPIToPlanTest.java        |  173 ++
 .../flink/optimizer/SortPartialReuseTest.java   |  130 ++
 .../UnionBetweenDynamicAndStaticPathTest.java   |  143 ++
 .../optimizer/UnionPropertyPropagationTest.java |  186 +++
 .../flink/optimizer/UnionReplacementTest.java   |   55 +
 .../WorksetIterationCornerCasesTest.java        |   77 +
 .../WorksetIterationsRecordApiCompilerTest.java |  247 +++
 .../costs/DefaultCostEstimatorTest.java         |  428 +++++
 ...naryCustomPartitioningCompatibilityTest.java |  130 ++
 .../CoGroupCustomPartitioningTest.java          |  312 ++++
 ...ustomPartitioningGlobalOptimizationTest.java |   95 ++
 .../custompartition/CustomPartitioningTest.java |  287 ++++
 .../GroupingKeySelectorTranslationTest.java     |  234 +++
 .../GroupingPojoTranslationTest.java            |  257 +++
 .../GroupingTupleTranslationTest.java           |  270 +++
 .../JoinCustomPartitioningTest.java             |  309 ++++
 .../DataExchangeModeClosedBranchingTest.java    |  257 +++
 .../DataExchangeModeForwardTest.java            |  139 ++
 .../DataExchangeModeOpenBranchingTest.java      |  182 ++
 .../dataexchange/PipelineBreakingTest.java      |  322 ++++
 .../GlobalPropertiesFilteringTest.java          |  428 +++++
 .../GlobalPropertiesMatchingTest.java           |  294 ++++
 .../GlobalPropertiesPushdownTest.java           |  107 ++
 .../LocalPropertiesFilteringTest.java           |  373 +++++
 .../dataproperties/MockDistribution.java        |   50 +
 .../dataproperties/MockPartitioner.java         |   32 +
 .../RequestedGlobalPropertiesFilteringTest.java |  430 +++++
 .../RequestedLocalPropertiesFilteringTest.java  |  246 +++
 .../java/DeltaIterationDependenciesTest.java    |   76 +
 .../java/DistinctAndGroupingOptimizerTest.java  |  112 ++
 .../java/GroupReduceCompilationTest.java        |  368 ++++
 .../optimizer/java/IterationCompilerTest.java   |  189 +++
 .../optimizer/java/JoinTranslationTest.java     |  168 ++
 .../flink/optimizer/java/OpenIterationTest.java |  183 ++
 .../optimizer/java/PartitionOperatorTest.java   |   70 +
 .../optimizer/java/ReduceCompilationTest.java   |  261 +++
 .../WorksetIterationsJavaApiCompilerTest.java   |  302 ++++
 ...oGroupGlobalPropertiesCompatibilityTest.java |  161 ++
 .../CoGroupOnConflictingPartitioningsTest.java  |   67 +
 .../JoinGlobalPropertiesCompatibilityTest.java  |  161 ++
 .../JoinOnConflictingPartitioningsTest.java     |   65 +
 .../flink/optimizer/plan/ChannelTest.java       |   90 +
 .../plandump/NumberFormattingTest.java          |   53 +
 .../testfunctions/DummyCoGroupFunction.java     |   31 +
 .../testfunctions/DummyFlatJoinFunction.java    |   33 +
 .../optimizer/testfunctions/DummyReducer.java   |   31 +
 .../testfunctions/IdentityFlatMapper.java       |   30 +
 .../testfunctions/IdentityGroupReducer.java     |   38 +
 .../testfunctions/IdentityKeyExtractor.java     |   31 +
 .../optimizer/testfunctions/IdentityMapper.java |   32 +
 .../IdentityPartitionerMapper.java              |   34 +
 .../testfunctions/SelectOneReducer.java         |   31 +
 .../testfunctions/Top1GroupReducer.java         |   35 +
 .../flink/optimizer/util/DummyCoGroupStub.java  |   42 +
 .../flink/optimizer/util/DummyCrossStub.java    |   32 +
 .../flink/optimizer/util/DummyInputFormat.java  |   42 +
 .../flink/optimizer/util/DummyMatchStub.java    |   37 +
 .../util/DummyNonPreservingMatchStub.java       |   35 +
 .../flink/optimizer/util/DummyOutputFormat.java |   34 +
 .../flink/optimizer/util/IdentityMap.java       |   37 +
 .../flink/optimizer/util/IdentityReduce.java    |   40 +
 .../src/test/resources/log4j-test.properties    |   19 +
 .../src/test/resources/log4j.properties         |   27 +
 .../src/test/resources/logback-test.xml         |   29 +
 .../main/resources/archetype-resources/pom.xml  |    2 +-
 .../main/resources/archetype-resources/pom.xml  |    2 +-
 flink-scala/pom.xml                             |    2 +-
 flink-staging/flink-streaming/pom.xml           |    6 -
 flink-test-utils/pom.xml                        |    2 +-
 flink-tests/pom.xml                             |    2 +-
 pom.xml                                         |    2 +-
 437 files changed, 38940 insertions(+), 38946 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 95d17d7..6b6d19d 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -50,7 +50,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-compiler</artifactId>
+			<artifactId>flink-optimizer</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/pom.xml
----------------------------------------------------------------------
diff --git a/flink-compiler/pom.xml b/flink-compiler/pom.xml
deleted file mode 100644
index 46d5a9e..0000000
--- a/flink-compiler/pom.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-parent</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-compiler</artifactId>
-	<name>flink-compiler</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
deleted file mode 100644
index 2f99ddb..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerException.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-/**
- * An exception that is thrown by the Optimizer when encountering an illegal condition.
- */
-public class CompilerException extends RuntimeException {
-
-	private static final long serialVersionUID = 3810067304570563755L;
-
-	/**
-	 * Creates a compiler exception with no message and no cause.
-	 */
-	public CompilerException() {}
-
-	/**
-	 * Creates a compiler exception with the given message and no cause.
-	 * 
-	 * @param message
-	 *        The message for the exception.
-	 */
-	public CompilerException(String message) {
-		super(message);
-	}
-
-	/**
-	 * Creates a compiler exception with the given cause and no message.
-	 * 
-	 * @param cause
-	 *        The <tt>Throwable</tt> that caused this exception.
-	 */
-	public CompilerException(Throwable cause) {
-		super(cause);
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and cause.
-	 * 
-	 * @param message
-	 *        The message for the exception.
-	 * @param cause
-	 *        The <tt>Throwable</tt> that caused this exception.
-	 */
-	public CompilerException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
deleted file mode 100644
index 78e47a0..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/CompilerPostPassException.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-/**
- * An exception that is thrown by the Optimizer when encountering
- * a problem during the optimizer post pass. This is a dedicated exception
- * because it is thrown by user-specified optimizer extensions.
- */
-public class CompilerPostPassException extends CompilerException {
-
-	private static final long serialVersionUID = -322650826288034623L;
-
-	/**
-	 * Creates a post pass exception with no message and no cause.
-	 */
-	public CompilerPostPassException() {}
-
-	/**
-	 * Creates a post pass exception with the given message and no cause.
-	 * 
-	 * @param message The message for the exception.
-	 */
-	public CompilerPostPassException(String message) {
-		super(message);
-	}
-
-	/**
-	 * Creates a post pass exception with the given cause and no message.
-	 * 
-	 * @param cause The <tt>Throwable</tt> that caused this exception.
-	 */
-	public CompilerPostPassException(Throwable cause) {
-		super(cause);
-	}
-
-	/**
-	 * Creates a post pass exception with the given message and cause.
-	 * 
-	 * @param message The message for the exception.
-	 * @param cause The <tt>Throwable</tt> that caused this exception.
-	 */
-	public CompilerPostPassException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
deleted file mode 100644
index cf6f4ec..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/DataStatistics.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-
-/**
- * The collection of access methods that can be used to retrieve statistical information about the
- * data processed in a job. Currently this method acts as an entry point only for obtaining cached
- * statistics.
- */
-public class DataStatistics {
-	
-	private final Map<String, BaseStatistics> baseStatisticsCache;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new statistics object, with an empty cache. 
-	 */
-	public DataStatistics() {
-		this.baseStatisticsCache = new HashMap<String, BaseStatistics>();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the base statistics for the input identified by the given identifier.
-	 *  
-	 * @param inputIdentifier The identifier for the input.
-	 * @return The statistics that were cached for this input.
-	 */
-	public BaseStatistics getBaseStatistics(String inputIdentifier) {
-		synchronized (this.baseStatisticsCache) {
-			return this.baseStatisticsCache.get(inputIdentifier);
-		}
-	}
-	
-	/**
-	 * Caches the given statistics. They are later retrievable under the given identifier.
-	 * 
-	 * @param statistics The statistics to cache.
-	 * @param identifier The identifier which may be later used to retrieve the statistics.
-	 */
-	public void cacheBaseStatistics(BaseStatistics statistics, String identifier) {
-		synchronized (this.baseStatisticsCache) {
-			this.baseStatisticsCache.put(identifier, statistics);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java
deleted file mode 100644
index 2101428..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.optimizer.traversals.BinaryUnionReplacer;
-import org.apache.flink.optimizer.traversals.BranchesVisitor;
-import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
-import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
-import org.apache.flink.optimizer.traversals.PlanFinalizer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.postpass.OptimizerPostPass;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * The optimizer that takes the user specified program plan and creates an optimized plan that contains
- * exact descriptions about how the physical execution will take place. It first translates the user
- * program into an internal optimizer representation and then chooses between different alternatives
- * for shipping strategies and local strategies.
- * <p>
- * The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The
- * optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating
- * alternative plans, pruning against the interesting properties.
- * <p>
- * The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All
- * sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory.
- */
-public class Optimizer {
-
-	// ------------------------------------------------------------------------
-	// Constants
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub
-	 * parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel.
-	 * If the operator has two input channels, the shipping strategy is applied to both input channels.
-	 */
-	public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
-
-	/**
-	 * Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to
-	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
-	 * to use for the <b>first</b> input channel. Only applicable to operators with two inputs.
-	 */
-	public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
-
-	/**
-	 * Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to
-	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
-	 * to use for the <b>second</b> input channel. Only applicable to operators with two inputs.
-	 */
-	public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
-
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the
-	 * input channel, i.e. no redistribution of any kind.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
-	
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a random repartition strategy.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION";
-	
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a hash-partition strategy.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
-	
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a range-partition strategy.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
-
-	/**
-	 * Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the
-	 * input channel.
-	 * 
-	 * @see #HINT_SHIP_STRATEGY
-	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-	 */
-	public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
-
-	/**
-	 * Compiler hint key for the operator's local strategy. This String is a key to the operator's stub
-	 * parameters. The corresponding value tells the compiler which local strategy to use to process the
-	 * data inside one partition.
-	 * <p>
-	 * This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that
-	 * have no choice in their local strategy (such as <i>Cross</i>).
-	 */
-	public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
-
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
-	 * For example, a <i>Reduce</i> operator will sort the data to group it.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
-	 * During sorting a combine method is repeatedly applied to reduce the data volume.
-	 * For example, a <i>Reduce</i> operator will sort the data to group it.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both
-	 * inputs with subsequent merging of inputs. 
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
-	 * The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged.
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
-	 * The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged.
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy.
-	 * Both inputs are assumed to be sorted and are merged. 
-	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs 
-	 * of matching keys.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
-
-	
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
-	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
-	 * matching keys. The <b>first</b> input will be used to build the hash table, the second input will be
-	 * used to probe the table.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
-
-	/**
-	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
-	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
-	 * matching keys. The <b>second</b> input will be used to build the hash table, the first input will be
-	 * used to probe the table.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
-	 * Hence, the data of the first input will be is streamed though, while the data of the second input is stored on
-	 * disk
-	 * and repeatedly read.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
-	 * Hence, the data of the second input will be is streamed though, while the data of the first input is stored on
-	 * disk
-	 * and repeatedly read.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
-	 * Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second
-	 * input,
-	 * being the inner side, will read repeatedly from disk.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
-
-	/**
-	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
-	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
-	 * Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first
-	 * input,
-	 * being the inner side, will read repeatedly from disk.
-	 * 
-	 * @see #HINT_LOCAL_STRATEGY
-	 */
-	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
-	
-	/**
-	 * The log handle that is used by the compiler to log messages.
-	 */
-	public static final Logger LOG = LoggerFactory.getLogger(Optimizer.class);
-
-	// ------------------------------------------------------------------------
-	// Members
-	// ------------------------------------------------------------------------
-
-	/**
-	 * The statistics object used to obtain statistics, such as input sizes,
-	 * for the cost estimation process.
-	 */
-	private final DataStatistics statistics;
-
-	/**
-	 * The cost estimator used by the compiler.
-	 */
-	private final CostEstimator costEstimator;
-
-	/**
-	 * The default degree of parallelism for jobs compiled by this compiler.
-	 */
-	private int defaultDegreeOfParallelism;
-
-
-	// ------------------------------------------------------------------------
-	// Constructor & Setup
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
-	 * inputs and can hence not determine any properties. It will perform all optimization with
-	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
-	 * of the most robust execution strategies.
-	 */
-	public Optimizer() {
-		this(null, new DefaultCostEstimator());
-	}
-
-	/**
-	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the optimizer can make better choices for the execution strategies.
-	 * 
-	 * @param stats
-	 *        The statistics to be used to determine the input properties.
-	 */
-	public Optimizer(DataStatistics stats) {
-		this(stats, new DefaultCostEstimator());
-	}
-
-	/**
-	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
-	 * inputs and can hence not determine any properties. It will perform all optimization with
-	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
-	 * of the most robust execution strategies.
-	 *
-	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
-	 * 
-	 * @param estimator The cost estimator to use to cost the individual operations.
-	 */
-	public Optimizer(CostEstimator estimator) {
-		this(null, estimator);
-	}
-
-	/**
-	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the optimizer can make better choices for the execution strategies.
-	 *
-	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
-	 * 
-	 * @param stats
-	 *        The statistics to be used to determine the input properties.
-	 * @param estimator
-	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
-	 */
-	public Optimizer(DataStatistics stats, CostEstimator estimator) {
-		this.statistics = stats;
-		this.costEstimator = estimator;
-
-		// determine the default parallelism
-		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
-				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
-		
-		if (defaultDegreeOfParallelism < 1) {
-			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
-					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
-			this.defaultDegreeOfParallelism = 1;
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//                             Getters / Setters
-	// ------------------------------------------------------------------------
-	
-	public int getDefaultDegreeOfParallelism() {
-		return defaultDegreeOfParallelism;
-	}
-	
-	public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
-		if (defaultDegreeOfParallelism > 0) {
-			this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
-		} else {
-			throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//                               Compilation
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
-	 * and all channels have a shipping strategy assigned.
-	 *
-	 * For more details on the optimization phase, see the comments for
-	 * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
-	 * 
-	 * @param program The program to be translated.
-	 * @return The optimized plan.
-	 *
-	 * @throws CompilerException
-	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
-	 *         situation during the compilation process.
-	 */
-	public OptimizedPlan compile(Plan program) throws CompilerException {
-		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
-		return compile(program, postPasser);
-	}
-
-	/**
-	 * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
-	 * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
-	 * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
-	 * where to cache intermediate results, etc,
-	 *
-	 * The optimization happens in multiple phases:
-	 * <ol>
-	 *     <li>Create optimizer dag implementation of the program.
-	 *
-	 *     <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
-	 * <li>Compute interesting properties and auxiliary structures.</li>
-	 * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
-	 * opposed to the Database approaches), because we support plans that are not trees.</li>
-	 * </ol>
-	 * 
-	 * @param program The program to be translated.
-	 * @param postPasser The function to be used for post passing the optimizer's plan and setting the
-	 *                   data type specific serialization routines.
-	 * @return The optimized plan.
-	 * 
-	 * @throws CompilerException
-	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
-	 *         situation during the compilation process.
-	 */
-	private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
-		if (program == null || postPasser == null) {
-			throw new NullPointerException();
-		}
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
-		}
-
-		final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
-
-		final int defaultParallelism = program.getDefaultParallelism() > 0 ?
-			program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
-
-		// log the default settings
-		LOG.debug("Using a default parallelism of {}",  defaultParallelism);
-		LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);
-
-		// the first step in the compilation is to create the optimizer plan representation
-		// this step does the following:
-		// 1) It creates an optimizer plan node for each operator
-		// 2) It connects them via channels
-		// 3) It looks for hints about local strategies and channel types and
-		// sets the types and strategies accordingly
-		// 4) It makes estimates about the data volume of the data sources and
-		// propagates those estimates through the plan
-
-		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
-		program.accept(graphCreator);
-
-		// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
-		// each until we have only a single root node. This allows to transparently deal with the nodes with
-		// multiple outputs
-		OptimizerNode rootNode;
-		if (graphCreator.getSinks().size() == 1) {
-			rootNode = graphCreator.getSinks().get(0);
-		}
-		else if (graphCreator.getSinks().size() > 1) {
-			Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
-			rootNode = iter.next();
-
-			while (iter.hasNext()) {
-				rootNode = new SinkJoiner(rootNode, iter.next());
-			}
-		}
-		else {
-			throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
-		}
-
-		// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
-		// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
-		rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
-
-		// We are dealing with operator DAGs, rather than operator trees.
-		// That requires us to deviate at some points from the classical DB optimizer algorithms.
-		// This step build some auxiliary structures to help track branches and joins in the DAG
-		BranchesVisitor branchingVisitor = new BranchesVisitor();
-		rootNode.accept(branchingVisitor);
-
-		// Propagate the interesting properties top-down through the graph
-		InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
-		rootNode.accept(propsVisitor);
-		
-		// perform a sanity check: the root may not have any unclosed branches
-		if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
-			throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
-					"track the re-joining of branches correctly.");
-		}
-
-		// the final step is now to generate the actual plan alternatives
-		List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
-
-		if (bestPlan.size() != 1) {
-			throw new CompilerException("Error in compiler: more than one best plan was created!");
-		}
-
-		// check if the best plan's root is a data sink (single sink plan)
-		// if so, directly take it. if it is a sink joiner node, get its contained sinks
-		PlanNode bestPlanRoot = bestPlan.get(0);
-		List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
-
-		if (bestPlanRoot instanceof SinkPlanNode) {
-			bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
-		} else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
-			((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
-		}
-		
-		DeadlockPreventer dp = new DeadlockPreventer();
-		dp.resolveDeadlocks(bestPlanSinks);
-
-		// finalize the plan
-		OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
-		
-		plan.accept(new BinaryUnionReplacer());
-		
-		// post pass the plan. this is the phase where the serialization and comparator code is set
-		postPasser.postPass(plan);
-		
-		return plan;
-	}
-
-	/**
-	 * This function performs only the first step to the compilation process - the creation of the optimizer
-	 * representation of the plan. No estimations or enumerations of alternatives are done here.
-	 * 
-	 * @param program The plan to generate the optimizer representation for.
-	 * @return The optimizer representation of the plan, as a collection of all data sinks
-	 *         from the plan can be traversed.
-	 */
-	public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
-		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
-		program.accept(graphCreator);
-		return graphCreator.getSinks();
-	}
-
-
-	// ------------------------------------------------------------------------
-	// Miscellaneous
-	// ------------------------------------------------------------------------
-	
-	private OptimizerPostPass getPostPassFromPlan(Plan program) {
-		final String className =  program.getPostPassClassName();
-		if (className == null) {
-			throw new CompilerException("Optimizer Post Pass class description is null");
-		}
-		try {
-			Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
-			try {
-				return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
-			} catch (RuntimeException rtex) {
-				// unwrap the source exception
-				if (rtex.getCause() != null) {
-					throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
-				} else {
-					throw rtex;
-				}
-			}
-		}
-		catch (ClassNotFoundException cnfex) {
-			throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
-		}
-		catch (ClassCastException ccex) {
-			throw new CompilerException("Class '" + className + "' is not an optimizer post-pass.", ccex);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
deleted file mode 100644
index 7880734..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.costs;
-
-import java.util.Iterator;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.EstimateProvider;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-
-/**
- * Abstract base class for a cost estimator. Defines cost estimation methods and implements the basic work
- * method that computes the cost of an operator by adding input shipping cost, input local cost, and
- * driver cost.
- */
-public abstract class CostEstimator {
-	
-	public abstract void addRandomPartitioningCost(EstimateProvider estimates, Costs costs);
-	
-	public abstract void addHashPartitioningCost(EstimateProvider estimates, Costs costs);
-	
-	public abstract void addRangePartitionCost(EstimateProvider estimates, Costs costs);
-
-	public abstract void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs);
-
-	// ------------------------------------------------------------------------
-
-	public abstract void addFileInputCost(long fileSizeInBytes, Costs costs);
-	
-	public abstract void addLocalSortCost(EstimateProvider estimates, Costs costs);
-	
-	public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, Costs costs, int costWeight);
-	
-	public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, Costs costs, int costWeight);
-	
-	public abstract void addCachedHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, Costs costs, int costWeight);
-
-	public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight);
-
-	public abstract void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs, int costWeight);
-
-	// ------------------------------------------------------------------------
-	
-	public abstract void addArtificialDamCost(EstimateProvider estimates, long bufferSize, Costs costs);
-	
-	// ------------------------------------------------------------------------	
-
-	/**
-	 * This method computes the cost of an operator. The cost is composed of cost for input shipping,
-	 * locally processing an input, and running the operator.
-	 * 
-	 * It requires at least that all inputs are set and have a proper ship strategy set,
-	 * which is not equal to <tt>NONE</tt>.
-	 * 
-	 * @param n The node to compute the costs for.
-	 */
-	public void costOperator(PlanNode n) {
-		// initialize costs objects with no costs
-		final Costs totalCosts = new Costs();
-		final long availableMemory = n.getGuaranteedAvailableMemory();
-		
-		// add the shipping strategy costs
-		for (Channel channel : n.getInputs()) {
-			final Costs costs = new Costs();
-			
-			// Plans that apply the same strategies, but at different points
-			// are equally expensive. For example, if a partitioning can be
-			// pushed below a Map function there is often no difference in plan
-			// costs between the pushed down version and the version that partitions
-			// after the Mapper. However, in those cases, we want the expensive
-			// strategy to appear later in the plan, as data reduction often occurs
-			// by large factors, while blowup is rare and typically by smaller fractions.
-			// We achieve this by adding a penalty to small penalty to the FORWARD strategy,
-			// weighted by the current plan depth (steps to the earliest data source).
-			// that way, later FORWARDS are more expensive than earlier forwards.
-			// Note that this only applies to the heuristic costs.
-			
-			switch (channel.getShipStrategy()) {
-			case NONE:
-				throw new CompilerException(
-					"Cannot determine costs: Shipping strategy has not been set for an input.");
-			case FORWARD:
-//				costs.addHeuristicNetworkCost(channel.getMaxDepth());
-				break;
-			case PARTITION_RANDOM:
-				addRandomPartitioningCost(channel, costs);
-				break;
-			case PARTITION_HASH:
-			case PARTITION_CUSTOM:
-				addHashPartitioningCost(channel, costs);
-				break;
-			case PARTITION_RANGE:
-				addRangePartitionCost(channel, costs);
-				break;
-			case BROADCAST:
-				addBroadcastCost(channel, channel.getReplicationFactor(), costs);
-				break;
-			case PARTITION_FORCED_REBALANCE:
-				addRandomPartitioningCost(channel, costs);
-				break;
-			default:
-				throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy());
-			}
-			
-			switch (channel.getLocalStrategy()) {
-			case NONE:
-				break;
-			case SORT:
-			case COMBININGSORT:
-				addLocalSortCost(channel, costs);
-				break;
-			default:
-				throw new CompilerException("Unsupported local strategy for input: " + channel.getLocalStrategy());
-			}
-			
-			if (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE) {
-				addArtificialDamCost(channel, 0, costs);
-			}
-			
-			// adjust with the cost weight factor
-			if (channel.isOnDynamicPath()) {
-				costs.multiplyWith(channel.getCostWeight());
-			}
-			
-			totalCosts.addCosts(costs);
-		} 
-		
-		Channel firstInput = null;
-		Channel secondInput = null;
-		Costs driverCosts = new Costs();
-		int costWeight = 1;
-		
-		// adjust with the cost weight factor
-		if (n.isOnDynamicPath()) {
-			costWeight = n.getCostWeight();
-		}
-		
-		// get the inputs, if we have some
-		{
-			Iterator<Channel> channels = n.getInputs().iterator();
-			if (channels.hasNext()) {
-				firstInput = channels.next();
-			}
-			if (channels.hasNext()) {
-				secondInput = channels.next();
-			}
-		}
-
-		// determine the local costs
-		switch (n.getDriverStrategy()) {
-		case NONE:
-		case UNARY_NO_OP:
-		case BINARY_NO_OP:	
-		case COLLECTOR_MAP:
-		case MAP:
-		case MAP_PARTITION:
-		case FLAT_MAP:
-			
-		case ALL_GROUP_REDUCE:
-		case ALL_REDUCE:
-			// this operations does not do any actual grouping, since every element is in the same single group
-			
-		case CO_GROUP:
-		case SORTED_GROUP_REDUCE:
-		case SORTED_REDUCE:
-			// grouping or co-grouping over sorted streams for free
-			
-		case SORTED_GROUP_COMBINE:
-			// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
-
-			// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
-		case ALL_GROUP_COMBINE:
-			
-		case UNION:
-			// pipelined local union is for free
-			
-			break;
-		case MERGE:
-			addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
-			break;
-		case HYBRIDHASH_BUILD_FIRST:
-			addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
-			break;
-		case HYBRIDHASH_BUILD_SECOND:
-			addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
-			break;
-		case HYBRIDHASH_BUILD_FIRST_CACHED:
-			addCachedHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
-			break;
-		case HYBRIDHASH_BUILD_SECOND_CACHED:
-			addCachedHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
-			break;
-		case NESTEDLOOP_BLOCKED_OUTER_FIRST:
-			addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
-			break;
-		case NESTEDLOOP_BLOCKED_OUTER_SECOND:
-			addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
-			break;
-		case NESTEDLOOP_STREAMED_OUTER_FIRST:
-			addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
-			break;
-		case NESTEDLOOP_STREAMED_OUTER_SECOND:
-			addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
-			break;
-		default:
-			throw new CompilerException("Unknown local strategy: " + n.getDriverStrategy().name());
-		}
-		
-		totalCosts.addCosts(driverCosts);
-		n.setCosts(totalCosts);
-	}
-}


Mime
View raw message