flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject flink git commit: [FLINK-7014] [table] Expose isDeterministic interface to UserDefinedFunction
Date Thu, 29 Jun 2017 02:55:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 958d3762d -> b59148cf7


[FLINK-7014] [table] Expose isDeterministic interface to UserDefinedFunction

This closes #4200


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59148cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59148cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59148cf

Branch: refs/heads/master
Commit: b59148cf7d206951191a24e6a6ce43937db5f045
Parents: 958d376
Author: Xpray <leonxpray@gmail.com>
Authored: Wed Jun 28 20:16:56 2017 +0800
Committer: Jark Wu <jark@apache.org>
Committed: Thu Jun 29 10:54:20 2017 +0800

----------------------------------------------------------------------
 .../table/functions/UserDefinedFunction.scala   |  8 ++++
 .../table/functions/utils/AggSqlFunction.scala  |  2 +
 .../functions/utils/ScalarSqlFunction.scala     |  1 +
 .../functions/utils/TableSqlFunction.scala      |  1 +
 .../flink/table/ExpressionReductionTest.scala   | 50 +++++++++++++++++++-
 5 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index e9e01ee..7c57ea0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -40,6 +40,14 @@ abstract class UserDefinedFunction extends Serializable {
   @throws(classOf[Exception])
   def close(): Unit = {}
 
+  /**
+    * @return true iff a call to this function is guaranteed to always return
+    *         the same result given the same parameters; true is assumed by default
+    *         if user's function is not pure functional, like random(), date(), now()...
+    *         isDeterministic must return false
+    */
+  def isDeterministic: Boolean = true
+
   final def functionIdentifier: String = {
     val md5  =  DigestUtils.md5Hex(serialize(this))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 816bc52..be5501a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -57,6 +57,8 @@ class AggSqlFunction(
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+  override def isDeterministic: Boolean = aggregateFunction.isDeterministic
 }
 
 object AggSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index bbfa3aa..b1b45cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -52,6 +52,7 @@ class ScalarSqlFunction(
 
   def getScalarFunction = scalarFunction
 
+  override def isDeterministic: Boolean = scalarFunction.isDeterministic
 }
 
 object ScalarSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
index 74f3374..b37d75b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -69,6 +69,7 @@ class TableSqlFunction(
     */
   def getPojoFieldMapping = functionImpl.fieldIndexes
 
+  override def isDeterministic: Boolean = udtf.isDeterministic
 }
 
 object TableSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
index 59eff4b..2e9ee46 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 class ExpressionReductionTest extends TableTestBase {
 
@@ -458,4 +459,51 @@ class ExpressionReductionTest extends TableTestBase {
     util.verifySql(sqlQuery, expected)
   }
 
+  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
+  @Ignore
+  def testReduceDeterministicUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    // if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
+    val result = table
+      .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
+      .where("d.isNull")
+      .select('a, 'b, 'c)
+
+    val expected: String = streamTableNode(0)
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceNonDeterministicUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
+      .where("d.isNull")
+      .select('a, 'b, 'c)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}
+
+object NonDeterministicNullFunc extends ScalarFunction {
+  def eval(): String = null
+  override def isDeterministic = false
+}
+
+object DeterministicNullFunc extends ScalarFunction {
+  def eval(): String = null
+  override def isDeterministic = true
 }


Mime
View raw message