Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D3F2D200B78 for ; Fri, 2 Sep 2016 17:45:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D278C160AAE; Fri, 2 Sep 2016 15:45:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7A8AD160A8C for ; Fri, 2 Sep 2016 17:45:15 +0200 (CEST) Received: (qmail 2442 invoked by uid 500); 2 Sep 2016 15:45:14 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 2432 invoked by uid 99); 2 Sep 2016 15:45:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2016 15:45:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 61702DFFB9; Fri, 2 Sep 2016 15:45:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: twalthr@apache.org To: commits@flink.apache.org Message-Id: <477a8912f0df4dc796bdd17950223fd9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3580] [table] Add current time point functions Date: Fri, 2 Sep 2016 15:45:14 +0000 (UTC) archived-at: Fri, 02 Sep 2016 15:45:17 -0000 Repository: flink Updated Branches: refs/heads/master fb8f2c935 -> e376c003c [FLINK-3580] [table] Add current time point functions This closes #2441. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e376c003 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e376c003 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e376c003 Branch: refs/heads/master Commit: e376c003c0fac456136e63e9d4cac4bfc07c02d4 Parents: fb8f2c9 Author: twalthr Authored: Tue Aug 30 17:21:13 2016 +0200 Committer: twalthr Committed: Fri Sep 2 17:44:30 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 169 ++++++++++++++++++- .../flink/api/scala/table/expressionDsl.scala | 72 ++++++++ .../flink/api/table/codegen/CodeGenerator.scala | 102 +++++++++++ .../codegen/calls/CurrentTimePointCallGen.scala | 58 +++++++ .../table/codegen/calls/ScalarFunctions.scala | 25 +++ .../flink/api/table/expressions/time.scala | 47 ++++++ .../api/table/validate/FunctionCatalog.scala | 8 +- .../expressions/NonDeterministicTests.scala | 89 ++++++++++ .../table/expressions/ScalarFunctionsTest.scala | 45 +++++ 9 files changed, 613 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 68a2b95..9272ea3 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -981,6 +981,8 @@ functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { " atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ; +fieldReference = "*" | identifier ; + nullLiteral = "Null(" , dataType , ")" ; timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ; @@ -989,7 +991,7 @@ timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUART {% endhighlight %} -Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data, and `functionIdentifier` specifies a supported scalar function. The +Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The column names and function names follow Java identifier syntax. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions. If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`. @@ -1521,6 +1523,61 @@ TIMEPOINT.ceil(TIMEINTERVALUNIT) + + + {% highlight java %} +currentDate() +{% endhighlight %} + + +

Returns the current SQL date in UTC time zone.

+ + + + + + {% highlight java %} +currentTime() +{% endhighlight %} + + +

Returns the current SQL time in UTC time zone.

+ + + + + + {% highlight java %} +currentTimestamp() +{% endhighlight %} + + +

Returns the current SQL timestamp in UTC time zone.

+ + + + + + {% highlight java %} +localTime() +{% endhighlight %} + + +

Returns the current SQL time in local time zone.

+ + + + + + {% highlight java %} +localTimestamp() +{% endhighlight %} + + +

Returns the current SQL timestamp in local time zone.

+ + + @@ -1828,6 +1885,61 @@ TIMEPOINT.ceil(TimeIntervalUnit) + + + {% highlight scala %} +currentDate() +{% endhighlight %} + + +

Returns the current SQL date in UTC time zone.

+ + + + + + {% highlight scala %} +currentTime() +{% endhighlight %} + + +

Returns the current SQL time in UTC time zone.

+ + + + + + {% highlight scala %} +currentTimestamp() +{% endhighlight %} + + +

Returns the current SQL timestamp in UTC time zone.

+ + + + + + {% highlight scala %} +localTime() +{% endhighlight %} + + +

Returns the current SQL time in local time zone.

+ + + + + + {% highlight scala %} +localTimestamp() +{% endhighlight %} + + +

Returns the current SQL timestamp in local time zone.

+ + + @@ -2093,6 +2205,61 @@ CEIL(TIMEPOINT TO TIMEINTERVALUNIT) + + + {% highlight sql %} +CURRENT_DATE +{% endhighlight %} + + +

Returns the current SQL date in UTC time zone.

+ + + + + + {% highlight sql %} +CURRENT_TIME +{% endhighlight %} + + +

Returns the current SQL time in UTC time zone.

+ + + + + + {% highlight sql %} +CURRENT_TIMESTAMP +{% endhighlight %} + + +

Returns the current SQL timestamp in UTC time zone.

+ + + + + + {% highlight sql %} +LOCALTIME +{% endhighlight %} + + +

Returns the current SQL time in local time zone.

+ + + + + + {% highlight sql %} +LOCALTIMESTAMP +{% endhighlight %} + + +

Returns the current SQL timestamp in local time zone.

+ + + http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 9bfe6c3..942b07e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -402,3 +402,75 @@ trait ImplicitExpressionConversions { implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime) implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression = Literal(sqlTimestamp) } + +// ------------------------------------------------------------------------------------------------ +// Expressions with no parameters +// ------------------------------------------------------------------------------------------------ + +/** + * Returns the current SQL date in UTC time zone. + */ +object currentDate { + + /** + * Returns the current SQL date in UTC time zone. + */ + def apply(): Expression = { + CurrentDate() + } +} + +/** + * Returns the current SQL time in UTC time zone. + */ +object currentTime { + + /** + * Returns the current SQL time in UTC time zone. + */ + def apply(): Expression = { + CurrentTime() + } +} + +/** + * Returns the current SQL timestamp in UTC time zone. + */ +object currentTimestamp { + + /** + * Returns the current SQL timestamp in UTC time zone. + */ + def apply(): Expression = { + CurrentTimestamp() + } +} + +/** + * Returns the current SQL time in local time zone. + */ +object localTime { + + /** + * Returns the current SQL time in local time zone. + */ + def apply(): Expression = { + LocalTime() + } +} + +/** + * Returns the current SQL timestamp in local time zone. + */ +object localTimestamp { + + /** + * Returns the current SQL timestamp in local time zone. + */ + def apply(): Expression = { + LocalTimestamp() + } +} + + + http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index 6463ff9..39ee26c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.table.codegen import java.math.{BigDecimal => JBigDecimal} +import org.apache.calcite.avatica.util.DateTimeUtils import org.apache.calcite.rex._ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.`type`.SqlTypeName._ @@ -106,6 +107,10 @@ class CodeGenerator( // we use a LinkedHashSet to keep the insertion order private val reusableInitStatements = mutable.LinkedHashSet[String]() + // set of statements that will be added only once per record + // we use a LinkedHashSet to keep the insertion order + private val reusablePerRecordStatements = mutable.LinkedHashSet[String]() + // map of initial input unboxing expressions that will be added only once // (inputTerm, index) -> expr private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]() @@ -126,6 +131,13 @@ class CodeGenerator( } /** + * @return code block of statements that need to be placed in the SAM of the Function + */ + def reusePerRecordCode(): String = { + reusablePerRecordStatements.mkString("", "\n", "\n") + } + + /** * @return code block of statements that unbox input variables to a primitive variable * and a corresponding null flag variable */ @@ -234,6 +246,7 @@ class CodeGenerator( @Override public ${samHeader._1} throws Exception { ${samHeader._2.mkString("\n")} + ${reusePerRecordCode()} ${reuseInputUnboxingCode()} $bodyCode } @@ -1243,4 +1256,93 @@ class CodeGenerator( reusableInitStatements.add(constructorAccessibility) fieldTerm } + + /** + * Adds a reusable timestamp to the beginning of the SAM of the generated [[Function]]. + */ + def addReusableTimestamp(): String = { + val fieldTerm = s"timestamp" + + val field = + s""" + |final long $fieldTerm = java.lang.System.currentTimeMillis(); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable local timestamp to the beginning of the SAM of the generated [[Function]]. + */ + def addReusableLocalTimestamp(): String = { + val fieldTerm = s"localtimestamp" + + val timestamp = addReusableTimestamp() + + val field = + s""" + |final long $fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset(timestamp); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable time to the beginning of the SAM of the generated [[Function]]. + */ + def addReusableTime(): String = { + val fieldTerm = s"time" + + val timestamp = addReusableTimestamp() + + // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime() + val field = + s""" + |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY}); + |if (time < 0) { + | time += ${DateTimeUtils.MILLIS_PER_DAY}; + |} + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable local time to the beginning of the SAM of the generated [[Function]]. + */ + def addReusableLocalTime(): String = { + val fieldTerm = s"localtime" + + val localtimestamp = addReusableLocalTimestamp() + + // adopted from org.apache.calcite.runtime.SqlFunctions.localTime() + val field = + s""" + |final int $fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY}); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + + /** + * Adds a reusable date to the beginning of the SAM of the generated [[Function]]. + */ + def addReusableDate(): String = { + val fieldTerm = s"date" + + val timestamp = addReusableTimestamp() + val time = addReusableTime() + + // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate() + val field = + s""" + |final int $fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY}); + |if ($time < 0) { + | $fieldTerm -= 1; + |} + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala new file mode 100644 index 0000000..4aaa209 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression} + +/** + * Generates function call to determine current time point (as date/time/timestamp) in + * local timezone or not. + */ +class CurrentTimePointCallGen( + targetType: TypeInformation[_], + local: Boolean) + extends CallGenerator { + + override def generate( + codeGenerator: CodeGenerator, + operands: Seq[GeneratedExpression]) + : GeneratedExpression = targetType match { + case SqlTimeTypeInfo.TIME if local => + val time = codeGenerator.addReusableLocalTime() + codeGenerator.generateNonNullLiteral(targetType, time) + + case SqlTimeTypeInfo.TIMESTAMP if local => + val timestamp = codeGenerator.addReusableLocalTimestamp() + codeGenerator.generateNonNullLiteral(targetType, timestamp) + + case SqlTimeTypeInfo.DATE => + val date = codeGenerator.addReusableDate() + codeGenerator.generateNonNullLiteral(targetType, date) + + case SqlTimeTypeInfo.TIME => + val time = codeGenerator.addReusableTime() + codeGenerator.generateNonNullLiteral(targetType, time) + + case SqlTimeTypeInfo.TIMESTAMP => + val timestamp = codeGenerator.addReusableTimestamp() + codeGenerator.generateNonNullLiteral(targetType, timestamp) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala index 8aa632f..24e8290 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala @@ -223,6 +223,31 @@ object ScalarFunctions { BuiltInMethod.CEIL.method, Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method))) + addSqlFunction( + CURRENT_DATE, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.DATE, local = false)) + + addSqlFunction( + CURRENT_TIME, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = false)) + + addSqlFunction( + CURRENT_TIMESTAMP, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = false)) + + addSqlFunction( + LOCALTIME, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = true)) + + addSqlFunction( + LOCALTIMESTAMP, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) + // ---------------------------------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala index 48b512c..385b3d5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala @@ -201,3 +201,50 @@ case class TemporalCeil( } } +abstract class CurrentTimePoint( + targetType: TypeInformation[_], + local: Boolean) + extends LeafExpression { + + override private[flink] def resultType: TypeInformation[_] = targetType + + override private[flink] def validateInput(): ExprValidationResult = { + if (!TypeCheckUtils.isTimePoint(targetType)) { + ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " + + s"but get $targetType.") + } else if (local && targetType == SqlTimeTypeInfo.DATE) { + ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp target " + + s"type, but get $targetType.") + } else { + ValidationSuccess + } + } + + override def toString: String = if (local) { + s"local$targetType()" + } else { + s"current$targetType()" + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + val operator = targetType match { + case SqlTimeTypeInfo.TIME if local => SqlStdOperatorTable.LOCALTIME + case SqlTimeTypeInfo.TIMESTAMP if local => SqlStdOperatorTable.LOCALTIMESTAMP + case SqlTimeTypeInfo.DATE => SqlStdOperatorTable.CURRENT_DATE + case SqlTimeTypeInfo.TIME => SqlStdOperatorTable.CURRENT_TIME + case SqlTimeTypeInfo.TIMESTAMP => SqlStdOperatorTable.CURRENT_TIMESTAMP + } + relBuilder.call(operator) + } +} + +case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false) + +case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false) + +case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false) + +case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true) + +case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true) + http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index b9a3f71..9808672 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -151,7 +151,13 @@ object FunctionCatalog { "mod" -> classOf[Mod], // temporal functions - "extract" -> classOf[Extract] + "extract" -> classOf[Extract], + "currentDate" -> classOf[CurrentDate], + "currentTime" -> classOf[CurrentTime], + "currentTimestamp" -> classOf[CurrentTimestamp], + "localTime" -> classOf[LocalTime], + "localTimestamp" -> classOf[LocalTimestamp] + // TODO implement function overloading here // "floor" -> classOf[TemporalFloor] // "ceil" -> classOf[TemporalCeil] http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala new file mode 100644 index 0000000..de48849 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.utils.ExpressionTestBase +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.{Ignore, Test} + +/** + * Tests that can only be checked manually as they are non-deterministic. + */ +class NonDeterministicTests extends ExpressionTestBase { + + @Ignore + @Test + def testCurrentDate(): Unit = { + testAllApis( + currentDate(), + "currentDate()", + "CURRENT_DATE", + "PLEASE CHECK MANUALLY") + } + + @Ignore + @Test + def testCurrentTime(): Unit = { + testAllApis( + currentTime(), + "currentTime()", + "CURRENT_TIME", + "PLEASE CHECK MANUALLY") + } + + @Ignore + @Test + def testCurrentTimestamp(): Unit = { + testAllApis( + currentTimestamp(), + "currentTimestamp()", + "CURRENT_TIMESTAMP", + "PLEASE CHECK MANUALLY") + } + + @Ignore + @Test + def testLocalTimestamp(): Unit = { + testAllApis( + localTimestamp(), + "localTimestamp()", + "LOCALTIMESTAMP", + "PLEASE CHECK MANUALLY") + } + + @Ignore + @Test + def testLocalTime(): Unit = { + testAllApis( + localTime(), + "localTime()", + "LOCALTIME", + "PLEASE CHECK MANUALLY") + } + + // ---------------------------------------------------------------------------------------------- + + override def testData: Any = new Row(0) + + override def typeInfo: TypeInformation[Any] = + new RowTypeInfo(Seq[TypeInformation[_]]()).asInstanceOf[TypeInformation[Any]] +} http://git-wip-us.apache.org/repos/asf/flink/blob/e376c003/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala index 7ab0c7d..516bfca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -753,6 +753,51 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testCurrentTimePoint(): Unit = { + + // current time points are non-deterministic + // we just test the format of the output + // manual test can be found in NonDeterministicTests + + testAllApis( + currentDate().cast(Types.STRING).charLength(), + "currentDate().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(CURRENT_DATE AS VARCHAR))", + "10") + + testAllApis( + currentTime().cast(Types.STRING).charLength(), + "currentTime().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(CURRENT_TIME AS VARCHAR))", + "8") + + testAllApis( + currentTimestamp().cast(Types.STRING).charLength() >= 22, + "currentTimestamp().cast(STRING).charLength() >= 22", + "CHAR_LENGTH(CAST(CURRENT_TIMESTAMP AS VARCHAR)) >= 22", + "true") + + testAllApis( + localTimestamp().cast(Types.STRING).charLength() >= 22, + "localTimestamp().cast(STRING).charLength() >= 22", + "CHAR_LENGTH(CAST(LOCALTIMESTAMP AS VARCHAR)) >= 22", + "true") + + testAllApis( + localTime().cast(Types.STRING).charLength(), + "localTime().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(LOCALTIME AS VARCHAR))", + "8") + + // comparisons are deterministic + testAllApis( + localTimestamp() === localTimestamp(), + "localTimestamp() === localTimestamp()", + "LOCALTIMESTAMP = LOCALTIMESTAMP", + "true") + } + // ---------------------------------------------------------------------------------------------- def testData = {