flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-7854] [table] Reject lateral table outer joins with predicates in SQL.
Date Thu, 19 Oct 2017 19:19:28 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 024d8f577 -> 1d10cee91


[FLINK-7854] [table] Reject lateral table outer joins with predicates in SQL.

This closes #4846.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d10cee9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d10cee9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d10cee9

Branch: refs/heads/release-1.3
Commit: 1d10cee91a12734de09b8c9665437309242e2dd5
Parents: 024d8f5
Author: Xingcan Cui <xingcanc@gmail.com>
Authored: Mon Oct 16 16:49:14 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Oct 19 19:30:05 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  3 +-
 .../calcite/FlinkCalciteSqlValidator.scala      | 29 +++++++++-
 .../sql/UserDefinedTableFunctionTest.scala      | 60 +++++++++++++++++++-
 .../sql/UserDefinedTableFunctionTest.scala      | 60 +++++++++++++++++++-
 4 files changed, 148 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 26f4f1b..8a60694 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -381,11 +381,12 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
     </tr>
     <tr>
     	<td>
-        <strong>User Defined Table Functions (UDTF)</strong><br>
+        <strong>Join with User Defined Table Functions (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
       </td>
     	<td>
       <p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF
documentation</a> for details on how to specify and register UDTFs. </p>
+      <p><b>Note:</b> Currently only literal <code>TRUE</code>
can be accepted as the predicate for the left outer join against a lateral table.</p>
 {% highlight sql %}
 SELECT users, tag 
 FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag

http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
index 2bdf360..137a199 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.adapter.java.JavaTypeFactory
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql._
-import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl}
+import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl, SqlValidatorScope}
+import org.apache.flink.table.api.ValidationException
 
 /**
  * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
@@ -48,4 +49,30 @@ class FlinkCalciteSqlValidator(
       insert: SqlInsert): RelDataType = {
     typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
   }
+
+  override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = {
+    // Due to the improper translation of lateral table left outer join in Calcite, we need
to
+    // temporarily forbid the common predicates until the problem is fixed (see FLINK-7865).
+    if (join.getJoinType == JoinType.LEFT &&
+      isCollectionTable(join.getRight)) {
+      join.getCondition match {
+        case c: SqlLiteral if c.booleanValue() && c.getValue.asInstanceOf[Boolean]
=>
+          // We accept only literal true
+        case c if null != c =>
+          throw new ValidationException(
+            s"Left outer joins with a table function do not accept a predicte such as $c.
" +
+              s"Only literal TRUE is accepted.")
+      }
+    }
+    super.validateJoin(join, scope)
+  }
+
+  private def isCollectionTable(node: SqlNode): Boolean = {
+    // TABLE (`func`(`foo`)) AS bar
+    node match {
+      case n: SqlCall if n.getKind == SqlKind.AS =>
+        n.getOperandList.get(0).getKind == SqlKind.COLLECTION_TABLE
+      case _ => false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
index e091da2..3f7231e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
 import org.apache.flink.table.utils._
@@ -74,7 +75,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testLeftOuterJoin(): Unit = {
+  def testLeftOuterJoinWithLiteralTrue(): Unit = {
     val util = batchTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -100,6 +101,46 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
+  def testLeftOuterJoinAsSubQuery(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
+    util.addFunction("func1", func1)
+
+    val sqlQuery =
+    """
+      | SELECT *
+      | FROM MyTable2 LEFT OUTER JOIN
+      |  (SELECT c, s
+      |   FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true)
+      | ON c2 = s """.stripMargin
+
+    val expected = binaryNode(
+      "DataSetJoin",
+      batchTableNode(1),
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+         "DataSetCorrelate",
+          batchTableNode(0),
+          term("invocation", "func1($cor0.c)"),
+          term("function", func1.getClass.getCanonicalName),
+          term("rowType",
+            "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+          term("joinType","LEFT")
+        ),
+        term("select", "c", "f0 AS s")
+      ),
+      term("where", "=(c2, s)"),
+      term("join", "a2", "b2", "c2", "c", "s"),
+      term("joinType", "LeftOuterJoin")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testCustomType(): Unit = {
     val util = batchTestUtil()
     val func2 = new TableFunc2
@@ -235,4 +276,21 @@ class UserDefinedTableFunctionTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  /**
+    * Due to the improper translation of TableFunction left outer join (see CALCITE-2004),
the
+    * join predicate can only be empty or literal true (the restriction should be removed
in
+    * FLINK-7865).
+    */
+  @Test(expected = classOf[ValidationException])
+  def testLeftOuterJoinWithPredicates(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON
c = s"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1d10cee9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
index 58eedd0..ae71749 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
 import org.apache.flink.table.utils._
@@ -74,7 +75,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
-  def testLeftOuterJoin(): Unit = {
+  def testLeftOuterJoinWithLiteralTrue(): Unit = {
     val util = streamTestUtil()
     val func1 = new TableFunc1
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -100,6 +101,46 @@ class UserDefinedTableFunctionTest extends TableTestBase {
   }
 
   @Test
+  def testLeftOuterJoinAsSubQuery(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
+    util.addFunction("func1", func1)
+
+    val sqlQuery =
+      """
+        | SELECT *
+        | FROM MyTable2 LEFT OUTER JOIN
+        |  (SELECT c, s
+        |   FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true)
+        | ON c2 = s """.stripMargin
+
+    val expected = binaryNode(
+      "DataSetJoin",
+      batchTableNode(1),
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetCorrelate",
+          batchTableNode(0),
+          term("invocation", "func1($cor0.c)"),
+          term("function", func1.getClass.getCanonicalName),
+          term("rowType",
+            "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+          term("joinType","LEFT")
+        ),
+        term("select", "c", "f0 AS s")
+      ),
+      term("where", "=(c2, s)"),
+      term("join", "a2", "b2", "c2", "c", "s"),
+      term("joinType", "LeftOuterJoin")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testCustomType(): Unit = {
     val util = streamTestUtil()
     val func2 = new TableFunc2
@@ -234,4 +275,21 @@ class UserDefinedTableFunctionTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  /**
+    * Due to the improper translation of TableFunction left outer join (see CALCITE-2004),
the
+    * join predicate can only be empty or literal true (the restriction should be removed
in
+    * FLINK-7865).
+    */
+  @Test(expected = classOf[ValidationException])
+  def testLeftOuterJoinWithPredicates(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON
c = s"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
 }


Mime
View raw message