flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-3580] [table] Add current time point functions
Date Fri, 02 Sep 2016 15:45:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master fb8f2c935 -> e376c003c


[FLINK-3580] [table] Add current time point functions

This closes #2441.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e376c003
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e376c003
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e376c003

Branch: refs/heads/master
Commit: e376c003c0fac456136e63e9d4cac4bfc07c02d4
Parents: fb8f2c9
Author: twalthr <twalthr@apache.org>
Authored: Tue Aug 30 17:21:13 2016 +0200
Committer: twalthr <twalthr@apache.org>
Committed: Fri Sep 2 17:44:30 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 169 ++++++++++++++++++-
 .../flink/api/scala/table/expressionDsl.scala   |  72 ++++++++
 .../flink/api/table/codegen/CodeGenerator.scala | 102 +++++++++++
 .../codegen/calls/CurrentTimePointCallGen.scala |  58 +++++++
 .../table/codegen/calls/ScalarFunctions.scala   |  25 +++
 .../flink/api/table/expressions/time.scala      |  47 ++++++
 .../api/table/validate/FunctionCatalog.scala    |   8 +-
 .../expressions/NonDeterministicTests.scala     |  89 ++++++++++
 .../table/expressions/ScalarFunctionsTest.scala |  45 +++++
 9 files changed, 613 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 68a2b95..9272ea3 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -981,6 +981,8 @@ functionCall = composite , "." , functionIdentifier , [ "(" , [ expression
, { "
 
 atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
 
+fieldReference = "*" | identifier ;
+
 nullLiteral = "Null(" , dataType , ")" ;
 
 timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE"
| "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND"
| "SECOND" ;
@@ -989,7 +991,7 @@ timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND"
| "QUART
 
 {% endhighlight %}
 
-Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data,
and `functionIdentifier` specifies a supported scalar function. The
+Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data
(or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function.
The
 column names and function names follow Java identifier syntax. Expressions specified as Strings
can also use prefix notation instead of suffix notation to call operators and functions.
 
 If working with exact numeric values or large decimals is required, the Table API also supports
Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")`
and in Java by appending a "p" for precise e.g. `123456p`.
@@ -1521,6 +1523,61 @@ TIMEPOINT.ceil(TIMEINTERVALUNIT)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+currentDate()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL date in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+currentTime()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL time in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+currentTimestamp()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL timestamp in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+localTime()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL time in local time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+localTimestamp()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL timestamp in local time zone.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -1828,6 +1885,61 @@ TIMEPOINT.ceil(TimeIntervalUnit)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+currentDate()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL date in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+currentTime()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL time in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+currentTimestamp()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL timestamp in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+localTime()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL time in local time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+localTimestamp()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL timestamp in local time zone.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -2093,6 +2205,61 @@ CEIL(TIMEPOINT TO TIMEINTERVALUNIT)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight sql %}
+CURRENT_DATE
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL date in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight sql %}
+CURRENT_TIME
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL time in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight sql %}
+CURRENT_TIMESTAMP
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL timestamp in UTC time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight sql %}
+LOCALTIME
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL time in local time zone.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight sql %}
+LOCALTIMESTAMP
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the current SQL timestamp in local time zone.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 9bfe6c3..942b07e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -402,3 +402,75 @@ trait ImplicitExpressionConversions {
   implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
   implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression = Literal(sqlTimestamp)
 }
+
+// ------------------------------------------------------------------------------------------------
+// Expressions with no parameters
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Returns the current SQL date in UTC time zone.
+  */
+object currentDate {
+
+  /**
+    * Returns the current SQL date in UTC time zone.
+    */
+  def apply(): Expression = {
+    CurrentDate()
+  }
+}
+
+/**
+  * Returns the current SQL time in UTC time zone.
+  */
+object currentTime {
+
+  /**
+    * Returns the current SQL time in UTC time zone.
+    */
+  def apply(): Expression = {
+    CurrentTime()
+  }
+}
+
+/**
+  * Returns the current SQL timestamp in UTC time zone.
+  */
+object currentTimestamp {
+
+  /**
+    * Returns the current SQL timestamp in UTC time zone.
+    */
+  def apply(): Expression = {
+    CurrentTimestamp()
+  }
+}
+
+/**
+  * Returns the current SQL time in local time zone.
+  */
+object localTime {
+
+  /**
+    * Returns the current SQL time in local time zone.
+    */
+  def apply(): Expression = {
+    LocalTime()
+  }
+}
+
+/**
+  * Returns the current SQL timestamp in local time zone.
+  */
+object localTimestamp {
+
+  /**
+    * Returns the current SQL timestamp in local time zone.
+    */
+  def apply(): Expression = {
+    LocalTimestamp()
+  }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 6463ff9..39ee26c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.table.codegen
 
 import java.math.{BigDecimal => JBigDecimal}
 
+import org.apache.calcite.avatica.util.DateTimeUtils
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
@@ -106,6 +107,10 @@ class CodeGenerator(
   // we use a LinkedHashSet to keep the insertion order
   private val reusableInitStatements = mutable.LinkedHashSet[String]()
 
+  // set of statements that will be added only once per record
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusablePerRecordStatements = mutable.LinkedHashSet[String]()
+
   // map of initial input unboxing expressions that will be added only once
   // (inputTerm, index) -> expr
   private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]()
@@ -126,6 +131,13 @@ class CodeGenerator(
   }
 
   /**
+    * @return code block of statements that need to be placed in the SAM of the Function
+    */
+  def reusePerRecordCode(): String = {
+    reusablePerRecordStatements.mkString("", "\n", "\n")
+  }
+
+  /**
     * @return code block of statements that unbox input variables to a primitive variable
     *         and a corresponding null flag variable
     */
@@ -234,6 +246,7 @@ class CodeGenerator(
         @Override
         public ${samHeader._1} throws Exception {
           ${samHeader._2.mkString("\n")}
+          ${reusePerRecordCode()}
           ${reuseInputUnboxingCode()}
           $bodyCode
         }
@@ -1243,4 +1256,93 @@ class CodeGenerator(
     reusableInitStatements.add(constructorAccessibility)
     fieldTerm
   }
+
+  /**
+    * Adds a reusable timestamp to the beginning of the SAM of the generated [[Function]].
+    */
+  def addReusableTimestamp(): String = {
+    val fieldTerm = s"timestamp"
+
+    val field =
+      s"""
+        |final long $fieldTerm = java.lang.System.currentTimeMillis();
+        |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+    /**
+    * Adds a reusable local timestamp to the beginning of the SAM of the generated [[Function]].
+    */
+  def addReusableLocalTimestamp(): String = {
+    val fieldTerm = s"localtimestamp"
+
+    val timestamp = addReusableTimestamp()
+
+    val field =
+      s"""
+        |final long $fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset(timestamp);
+        |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable time to the beginning of the SAM of the generated [[Function]].
+    */
+  def addReusableTime(): String = {
+    val fieldTerm = s"time"
+
+    val timestamp = addReusableTimestamp()
+
+    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
+    val field =
+      s"""
+        |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
+        |if (time < 0) {
+        |  time += ${DateTimeUtils.MILLIS_PER_DAY};
+        |}
+        |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable local time to the beginning of the SAM of the generated [[Function]].
+    */
+  def addReusableLocalTime(): String = {
+    val fieldTerm = s"localtime"
+
+    val localtimestamp = addReusableLocalTimestamp()
+
+    // adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
+    val field =
+      s"""
+        |final int $fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
+        |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+
+  /**
+    * Adds a reusable date to the beginning of the SAM of the generated [[Function]].
+    */
+  def addReusableDate(): String = {
+    val fieldTerm = s"date"
+
+    val timestamp = addReusableTimestamp()
+    val time = addReusableTime()
+
+    // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
+    val field =
+      s"""
+        |final int $fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
+        |if ($time < 0) {
+        |  $fieldTerm -= 1;
+        |}
+        |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
new file mode 100644
index 0000000..4aaa209
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates function call to determine current time point (as date/time/timestamp) in
+  * local timezone or not.
+  */
+class CurrentTimePointCallGen(
+    targetType: TypeInformation[_],
+    local: Boolean)
+  extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = targetType match {
+    case SqlTimeTypeInfo.TIME if local =>
+      val time = codeGenerator.addReusableLocalTime()
+      codeGenerator.generateNonNullLiteral(targetType, time)
+
+    case SqlTimeTypeInfo.TIMESTAMP if local =>
+      val timestamp = codeGenerator.addReusableLocalTimestamp()
+      codeGenerator.generateNonNullLiteral(targetType, timestamp)
+
+    case SqlTimeTypeInfo.DATE =>
+      val date = codeGenerator.addReusableDate()
+      codeGenerator.generateNonNullLiteral(targetType, date)
+
+    case SqlTimeTypeInfo.TIME =>
+      val time = codeGenerator.addReusableTime()
+      codeGenerator.generateNonNullLiteral(targetType, time)
+
+    case SqlTimeTypeInfo.TIMESTAMP =>
+      val timestamp = codeGenerator.addReusableTimestamp()
+      codeGenerator.generateNonNullLiteral(targetType, timestamp)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index 8aa632f..24e8290 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -223,6 +223,31 @@ object ScalarFunctions {
       BuiltInMethod.CEIL.method,
       Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))
 
+  addSqlFunction(
+    CURRENT_DATE,
+    Seq(),
+    new CurrentTimePointCallGen(SqlTimeTypeInfo.DATE, local = false))
+
+  addSqlFunction(
+    CURRENT_TIME,
+    Seq(),
+    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = false))
+
+  addSqlFunction(
+    CURRENT_TIMESTAMP,
+    Seq(),
+    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = false))
+
+  addSqlFunction(
+    LOCALTIME,
+    Seq(),
+    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = true))
+
+  addSqlFunction(
+    LOCALTIMESTAMP,
+    Seq(),
+    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
+
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
index 48b512c..385b3d5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
@@ -201,3 +201,50 @@ case class TemporalCeil(
   }
 }
 
+abstract class CurrentTimePoint(
+    targetType: TypeInformation[_],
+    local: Boolean)
+  extends LeafExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = targetType
+
+  override private[flink] def validateInput(): ExprValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(targetType)) {
+      ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
+        s"but get $targetType.")
+    } else if (local && targetType == SqlTimeTypeInfo.DATE) {
+      ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp
target " +
+        s"type, but get $targetType.")
+    } else {
+      ValidationSuccess
+    }
+  }
+
+  override def toString: String = if (local) {
+    s"local$targetType()"
+  } else {
+    s"current$targetType()"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val operator = targetType match {
+      case SqlTimeTypeInfo.TIME if local => SqlStdOperatorTable.LOCALTIME
+      case SqlTimeTypeInfo.TIMESTAMP if local => SqlStdOperatorTable.LOCALTIMESTAMP
+      case SqlTimeTypeInfo.DATE => SqlStdOperatorTable.CURRENT_DATE
+      case SqlTimeTypeInfo.TIME => SqlStdOperatorTable.CURRENT_TIME
+      case SqlTimeTypeInfo.TIMESTAMP => SqlStdOperatorTable.CURRENT_TIMESTAMP
+    }
+    relBuilder.call(operator)
+  }
+}
+
+case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
+
+case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
+
+case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local =
false)
+
+case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
+
+case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index b9a3f71..9808672 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -151,7 +151,13 @@ object FunctionCatalog {
     "mod" -> classOf[Mod],
 
     // temporal functions
-    "extract" -> classOf[Extract]
+    "extract" -> classOf[Extract],
+    "currentDate" -> classOf[CurrentDate],
+    "currentTime" -> classOf[CurrentTime],
+    "currentTimestamp" -> classOf[CurrentTimestamp],
+    "localTime" -> classOf[LocalTime],
+    "localTimestamp" -> classOf[LocalTimestamp]
+
     // TODO implement function overloading here
     // "floor" -> classOf[TemporalFloor]
     // "ceil" -> classOf[TemporalCeil]

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
new file mode 100644
index 0000000..de48849
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.{Ignore, Test}
+
+/**
+  * Tests that can only be checked manually as they are non-deterministic.
+  */
+class NonDeterministicTests extends ExpressionTestBase {
+
+  @Ignore
+  @Test
+  def testCurrentDate(): Unit = {
+    testAllApis(
+      currentDate(),
+      "currentDate()",
+      "CURRENT_DATE",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testCurrentTime(): Unit = {
+    testAllApis(
+      currentTime(),
+      "currentTime()",
+      "CURRENT_TIME",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testCurrentTimestamp(): Unit = {
+    testAllApis(
+      currentTimestamp(),
+      "currentTimestamp()",
+      "CURRENT_TIMESTAMP",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testLocalTimestamp(): Unit = {
+    testAllApis(
+      localTimestamp(),
+      "localTimestamp()",
+      "LOCALTIMESTAMP",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testLocalTime(): Unit = {
+    testAllApis(
+      localTime(),
+      "localTime()",
+      "LOCALTIME",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def testData: Any = new Row(0)
+
+  override def typeInfo: TypeInformation[Any] =
+    new RowTypeInfo(Seq[TypeInformation[_]]()).asInstanceOf[TypeInformation[Any]]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 7ab0c7d..516bfca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -753,6 +753,51 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "true")
   }
 
+  @Test
+  def testCurrentTimePoint(): Unit = {
+
+    // current time points are non-deterministic
+    // we just test the format of the output
+    // manual test can be found in NonDeterministicTests
+
+    testAllApis(
+      currentDate().cast(Types.STRING).charLength(),
+      "currentDate().cast(STRING).charLength()",
+      "CHAR_LENGTH(CAST(CURRENT_DATE AS VARCHAR))",
+      "10")
+
+    testAllApis(
+      currentTime().cast(Types.STRING).charLength(),
+      "currentTime().cast(STRING).charLength()",
+      "CHAR_LENGTH(CAST(CURRENT_TIME AS VARCHAR))",
+      "8")
+
+    testAllApis(
+      currentTimestamp().cast(Types.STRING).charLength() >= 22,
+      "currentTimestamp().cast(STRING).charLength() >= 22",
+      "CHAR_LENGTH(CAST(CURRENT_TIMESTAMP AS VARCHAR)) >= 22",
+      "true")
+
+    testAllApis(
+      localTimestamp().cast(Types.STRING).charLength() >= 22,
+      "localTimestamp().cast(STRING).charLength() >= 22",
+      "CHAR_LENGTH(CAST(LOCALTIMESTAMP AS VARCHAR)) >= 22",
+      "true")
+
+    testAllApis(
+      localTime().cast(Types.STRING).charLength(),
+      "localTime().cast(STRING).charLength()",
+      "CHAR_LENGTH(CAST(LOCALTIME AS VARCHAR))",
+      "8")
+
+    // comparisons are deterministic
+    testAllApis(
+      localTimestamp() === localTimestamp(),
+      "localTimestamp() === localTimestamp()",
+      "LOCALTIMESTAMP = LOCALTIMESTAMP",
+      "true")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   def testData = {


Mime
View raw message