flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-3225] Enforce translation to DataSetNodes
Date Mon, 01 Feb 2016 22:47:26 GMT
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 7ecb70105 -> 297564646


[FLINK-3225] Enforce translation to DataSetNodes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29756464
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29756464
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29756464

Branch: refs/heads/tableOnCalcite
Commit: 2975646469bcc6b29b36e5d8bb61663af6da8749
Parents: 7ecb701
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Feb 1 23:45:16 2016 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Feb 1 23:47:08 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/table/JavaBatchTranslator.scala  | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29756464/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 66bfbe7..7e91190 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.api.java.table
 
 import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.table.plan._
 import org.apache.flink.api.table.Table
-import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.DataSetTable
 
@@ -61,20 +61,19 @@ class JavaBatchTranslator extends PlanTranslator {
     // get the planner for the plan
     val planner = lPlan.getCluster.getPlanner
 
-    // we do not have any special requirements for the output
-    val outputProps = RelTraitSet.createEmpty()
 
     println("-----------")
     println("Input Plan:")
     println("-----------")
     println(RelOptUtil.toString(lPlan))
-
+    
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
 
     // optimize the logical Flink plan
     val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
-    val optPlan = optProgram.run(planner, decorPlan, outputProps)
+    val flinkOutputProps = RelTraitSet.createEmpty()
+    val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps)
 
     println("---------------")
     println("Optimized Plan:")
@@ -83,7 +82,10 @@ class JavaBatchTranslator extends PlanTranslator {
 
     // optimize the logical Flink plan
     val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES)
-    val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps)
+    val dataSetOutputProps = RelTraitSet.createEmpty()
+      .plus(DataSetConvention.INSTANCE)
+      .plus(RelCollations.of()).simplify()
+    val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps)
 
     println("-------------")
     println("DataSet Plan:")


Mime
View raw message