flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-7005] [table] Optimization steps are missing for nested registered tables
Date Tue, 27 Jun 2017 13:14:28 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 f39899da1 -> 58b3b19c9


[FLINK-7005] [table] Optimization steps are missing for nested registered tables

This closes #4186.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58b3b19c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58b3b19c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58b3b19c

Branch: refs/heads/release-1.3
Commit: 58b3b19c91539f11d3d2636090e3bec1dd7b75d0
Parents: f39899d
Author: twalthr <twalthr@apache.org>
Authored: Mon Jun 26 15:22:11 2017 +0200
Committer: twalthr <twalthr@apache.org>
Committed: Tue Jun 27 15:09:38 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  6 +++-
 .../table/api/StreamTableEnvironment.scala      |  6 +++-
 .../flink/table/plan/rules/FlinkRuleSets.scala  | 11 +++---
 .../flink/table/ExpressionReductionTest.scala   | 36 ++++++++++++++++++++
 .../plan/rules/NormalizationRulesTest.scala     |  4 +--
 5 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58b3b19c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 3c0f51b..2cdb27c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -276,8 +276,12 @@ abstract class BatchTableEnvironment(
     */
   private[flink] def optimize(relNode: RelNode): RelNode = {
 
+    // 0. convert registered tables
+    val fullRelNode = runHepPlanner(
+      HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_CONV_RULES, relNode, relNode.getTraitSet)
+
     // 1. decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
+    val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
 
     // 2. normalize the logical plan
     val normRuleSet = getNormRuleSet

http://git-wip-us.apache.org/repos/asf/flink/blob/58b3b19c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index eb3eb5c..c5a66b5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -559,8 +559,12 @@ abstract class StreamTableEnvironment(
     */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode =
{
 
+    // 0. convert registered tables
+    val fullRelNode = runHepPlanner(
+      HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_CONV_RULES, relNode, relNode.getTraitSet)
+
     // 1. decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
+    val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
 
     // 2. convert time indicators
     val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)

http://git-wip-us.apache.org/repos/asf/flink/blob/58b3b19c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 980dfd3..bb3833a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -28,11 +28,14 @@ import org.apache.flink.table.plan.nodes.logical._
 
 object FlinkRuleSets {
 
-  val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
-
-    // convert a logical table scan to a relational expression
+  /**
+    * Convert a logical table scan to a relational expression.
+    */
+  val TABLE_CONV_RULES: RuleSet = RuleSets.ofList(
     TableScanRule.INSTANCE,
-    EnumerableToLogicalTableScan.INSTANCE,
+    EnumerableToLogicalTableScan.INSTANCE)
+
+  val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
 
     // push a filter into a join
     FilterJoinRule.FILTER_ON_JOIN,

http://git-wip-us.apache.org/repos/asf/flink/blob/58b3b19c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
index 314d863..59eff4b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -422,4 +422,40 @@ class ExpressionReductionTest extends TableTestBase {
     util.verifyTable(result, expected)
   }
 
+  @Test
+  def testNestedTablesReductionStream(): Unit = {
+    val util = streamTestUtil()
+
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val newTable = util.tEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+
+    util.tEnv.registerTable("NewTable", newTable)
+
+    val sqlQuery = "SELECT a FROM NewTable"
+
+    // 1+1 should be normalized to 2
+    val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a)
AS a"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testNestedTablesReductionBatch(): Unit = {
+    val util = batchTestUtil()
+
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val newTable = util.tEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+
+    util.tEnv.registerTable("NewTable", newTable)
+
+    val sqlQuery = "SELECT a FROM NewTable"
+
+    // 1+1 should be normalized to 2
+    val expected = unaryNode("DataSetCalc", batchTableNode(0), term("select", "+(2, a) AS
a"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58b3b19c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
index b563a8b..5bd7e77 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala
@@ -51,7 +51,7 @@ class NormalizationRulesTest extends TableTestBase {
       unaryNode("LogicalAggregate",
         unaryNode("LogicalAggregate",
           unaryNode("LogicalProject",
-            values("LogicalTableScan", term("table", "[MyTable]")),
+            values("LogicalTableScan", term("table", "[_DataSetTable_0]")),
             term("b", "$1"), term("a", "$0")),
           term("group", "{0, 1}")),
         term("group", "{0}"), term("EXPR$0", "COUNT($1)")
@@ -86,7 +86,7 @@ class NormalizationRulesTest extends TableTestBase {
       unaryNode("LogicalAggregate",
         unaryNode("LogicalAggregate",
           unaryNode("LogicalProject",
-            values("LogicalTableScan", term("table", "[MyTable]")),
+            values("LogicalTableScan", term("table", "[_DataStreamTable_0]")),
             term("b", "$1"), term("a", "$0")),
           term("group", "{0, 1}")),
         term("group", "{0}"), term("EXPR$0", "COUNT($1)")


Mime
View raw message