flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/2] flink git commit: [FLINK-4068] [table] Move constant computations out of code-generated
Date Tue, 04 Oct 2016 12:36:38 GMT
Repository: flink
Updated Branches:
  refs/heads/master 171d10930 -> f00e1e7c5


[FLINK-4068] [table] Move constant computations out of code-generated

This closes #2560.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7113394
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7113394
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7113394

Branch: refs/heads/master
Commit: a71133945e66ec471b4cb07da0693d545c40923c
Parents: 171d109
Author: Jark Wu <wuchong.wc@alibaba-inc.com>
Authored: Wed Sep 28 10:31:59 2016 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Tue Oct 4 12:02:36 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala |  48 +++++----
 .../flink/api/table/FlinkRelBuilder.scala       |   5 +-
 .../api/table/StreamTableEnvironment.scala      |  40 ++++---
 .../flink/api/table/TableEnvironment.scala      |   5 +-
 .../api/table/plan/logical/operators.scala      |   3 +-
 .../flink/api/table/plan/nodes/FlinkCalc.scala  |   2 +-
 .../flink/api/scala/batch/ExplainTest.scala     |   4 +-
 .../api/table/BatchTableEnvironmentTest.scala   | 102 ++++++++++++++++++
 .../api/table/StreamTableEnvironmentTest.scala  | 106 +++++++++++++++++++
 .../src/test/scala/resources/testJoin0.out      |   2 +-
 .../src/test/scala/resources/testJoin1.out      |   2 +-
 11 files changed, 277 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index ad3ff7a..10c2450 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 
@@ -228,21 +229,12 @@ abstract class BatchTableEnvironment(
   }
 
   /**
-    * Translates a [[Table]] into a [[DataSet]].
+    * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
-    *
-    * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
-    * @tparam A The type of the resulting [[DataSet]].
-    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    * @param relNode The original [[RelNode]] tree
+    * @return The optimized [[RelNode]] tree
     */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A]
= {
-
-    validateType(tpe)
-
-    val relNode = table.getRelNode
+  private[flink] def optimize(relNode: RelNode): RelNode = {
 
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
@@ -253,8 +245,7 @@ abstract class BatchTableEnvironment(
 
     val dataSetPlan = try {
       optProgram.run(getPlanner, decorPlan, flinkOutputProps)
-    }
-    catch {
+    } catch {
       case e: CannotPlanException =>
         throw new TableException(
           s"Cannot generate a valid execution plan for the given query: \n\n" +
@@ -263,13 +254,32 @@ abstract class BatchTableEnvironment(
             s"Please check the documentation for the set of currently supported SQL features.")
       case t: TableException =>
         throw new TableException(
-        s"Cannot generate a valid execution plan for the given query: \n\n" +
-          s"${RelOptUtil.toString(relNode)}\n" +
-          s"${t.msg}\n" +
-          s"Please check the documentation for the set of currently supported SQL features.")
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(relNode)}\n" +
+            s"${t.msg}\n" +
+            s"Please check the documentation for the set of currently supported SQL features.")
       case a: AssertionError =>
         throw a.getCause
     }
+    dataSetPlan
+  }
+
+  /**
+    * Translates a [[Table]] into a [[DataSet]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+    *
+    * @param table The root node of the relational expression tree.
+    * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+    * @tparam A The type of the resulting [[DataSet]].
+    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A]
= {
+
+    validateType(tpe)
+
+    val dataSetPlan = optimize(table.getRelNode)
 
     dataSetPlan match {
       case node: DataSetRel =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index 3827f05..34ed4ce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.table
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
 import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder}
+import org.apache.calcite.schema.{Schemas, SchemaPlus}
 import org.apache.calcite.tools.Frameworks.PlannerAction
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
 
@@ -66,6 +66,7 @@ object FlinkRelBuilder {
       }
     })
     val planner = clusters(0).getPlanner
+    planner.setExecutor(config.getExecutor)
     val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]
 
     // create Flink type factory

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index e3e5751..44d90ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -22,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
@@ -228,22 +230,12 @@ abstract class StreamTableEnvironment(
   }
 
   /**
-    * Translates a [[Table]] into a [[DataStream]].
-    *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
-    * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
-    * @tparam A The type of the resulting [[DataStream]].
-    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    * @param relNode The root node of the relational expression tree.
+    * @return The optimized [[RelNode]] tree
     */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A]
= {
-
-    validateType(tpe)
-
-    val relNode = table.getRelNode
-
+  private[flink] def optimize(relNode: RelNode): RelNode = {
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
@@ -262,6 +254,26 @@ abstract class StreamTableEnvironment(
             s"This exception indicates that the query uses an unsupported SQL feature.\n"
+
             s"Please check the documentation for the set of currently supported SQL features.")
     }
+    dataStreamPlan
+  }
+
+
+  /**
+    * Translates a [[Table]] into a [[DataStream]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    *
+    * @param table The root node of the relational expression tree.
+    * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A]
= {
+
+    validateType(tpe)
+
+   val dataStreamPlan = optimize(table.getRelNode)
 
     dataStreamPlan match {
       case node: DataStreamRel =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index f56df0c..02204b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -24,7 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.rex.RexExecutorImpl
+import org.apache.calcite.schema.{Schemas, SchemaPlus}
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
@@ -77,6 +78,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     .costFactory(new DataSetCostFactory)
     .typeSystem(new FlinkTypeSystem)
     .operatorTable(sqlOperatorTable)
+    // set the executor to evaluate constant expressions
+    .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
     .build
 
   // the builder for Calcite RelNodes, Calcite's representation of a relational expression
tree.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index ccdab85..066e9d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -87,7 +87,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode)
extend
       // Calcite's RelBuilder does not translate identity projects even if they rename fields.
       //   Add a projection ourselves (will be automatically removed by translation rules).
       val project = LogicalProject.create(relBuilder.peek(),
-        projectList.map(_.toRexNode(relBuilder)).asJava,
+        // avoid AS call
+        projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
         projectList.map(_.name).asJava)
       relBuilder.build()  // pop previous relNode
       relBuilder.push(project)

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
index aa5492f..d5f8010 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
@@ -134,7 +134,7 @@ trait FlinkCalc {
     val proj = calcProgram.getProjectList.asScala.toList
     val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
     val localExprs = calcProgram.getExprList.asScala.toList
-    val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
 
     proj
       .map(expression(_, inFields, Some(localExprs)))

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
index ab70ec5..9d00dda 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
@@ -72,7 +72,7 @@ class ExplainTest
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -87,7 +87,7 @@ class ExplainTest
     val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
new file mode 100644
index 0000000..0344dee
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.junit.Assert._
+import org.junit.Test
+
+
+class BatchTableEnvironmentTest {
+
+  @Test
+  def testReduceExpressionForSQL(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest') " +
+      "FROM MyTable WHERE a>(1+7)"
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+  }
+
+  @Test
+  def testReduceExpressionForTableAPI(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+
+    val table = ds
+      .where('a > (1+7))
+      .select((3+4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor())
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
new file mode 100644
index 0000000..52bf9ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala.stream.utils.StreamTestData
+import org.apache.flink.api.scala.table._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+import org.junit.Assert._
+
+
+
+class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{
+
+  @Test
+  def testReduceExpression(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val sqlQuery = "SELECT STREAM " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest') " +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+  }
+
+  @Test
+  def testReduceExpressionForTableAPI(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+
+    val table = t
+      .where('a > (1+7))
+      .select((3+4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor())
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index f71ea9f..11961ef 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -36,7 +36,7 @@ Stage 6 : Data Source
 			Partitioning : RANDOM_PARTITIONED
 
 			Stage 1 : FlatMap
-				content : select: (a, c AS b)
+				content : select: (a, c)
 				ship_strategy : Forward
 				exchange_mode : PIPELINED
 				driver_strategy : FlatMap

http://git-wip-us.apache.org/repos/asf/flink/blob/a7113394/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index f117cd9..c6e8b34 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -121,7 +121,7 @@ Stage 6 : Data Source
 			Filter Factor : (none)
 
 			Stage 1 : FlatMap
-				content : select: (a, c AS b)
+				content : select: (a, c)
 				ship_strategy : Forward
 				exchange_mode : PIPELINED
 				driver_strategy : FlatMap


Mime
View raw message