flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
Date Tue, 25 Sep 2018 17:16:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627664#comment-16627664
] 

ASF GitHub Bot commented on FLINK-6847:
---------------------------------------

asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api
and sql support
URL: https://github.com/apache/flink/pull/6282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index eac6d16eaf8..7d9aee22956 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue)
         <p>E.g., <code>TIMESTAMPADD(WEEK, 1, DATE '2003-01-02')</code>
returns <code>2003-01-09</code>.</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+TIMESTAMPDIFF(unit, timestamp1, timestamp2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the (signed) number of timeUnit intervals between timestamp1 and
timestamp2. The unit for the interval is given by the unit argument, which should be one of
the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>,
<code>DAY</code>, <code>WEEK</code>, <code>MONTH</code>,
<code>QUARTER</code>, or <code>YEAR</code>. The unit for the interval
could refer <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point
Unit Specifiers table</a>. E.g. <code>TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02
10:00:00', TIMESTAMP '2003-01-03 10:00:00')</code> leads to <code>1</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING)
         <p>E.g., <code>dateFormat(ts, '%Y, %d %M')</code> results in strings
formatted as "2017, 05 May".</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the (signed) number of timeUnit intervals between datetime1 and
datetime2. The unit for the interval is given by the unit argument, which should be one of
the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>,
<code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>.
The unit for the interval could refer <a href="#time-interval-and-point-unit-specifiers">Time
Interval and Point Unit Specifiers table</a>. E.g. <code>timestampDiff(TimeIntervalUnit.DAY,
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp)</code> leads to
<code>1</code>.</p>
+      </td>
+    </tr>
+
     </tbody>
 </table>
 </div>
@@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING)
         <p>E.g., <code>dateFormat('ts, "%Y, %d %M")</code> results in strings
formatted as "2017, 05 May".</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the (signed) number of timeUnit intervals between datetime1 and
datetime2. The unit for the interval is given by the unit argument, which should be one of
the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>,
<code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>.
The unit for the interval could refer <a href="#time-interval-and-point-unit-specifiers">Time
Interval and Point Unit Specifiers table</a>. E.g. <code>timestampDiff(TimeIntervalUnit.DAY,
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp)</code> leads to
<code>1</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -5319,4 +5355,65 @@ The following table lists specifiers for date format functions.
   </tbody>
 </table>
 
+Time Interval and Point Unit Specifiers
+----------------------
+
+The following table lists specifiers for time interval and point unit.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Interval Unit</th>
+      <th class="text-center">Point Unit</th>
+    </tr>
+  </thead>
+  <tbody>
+  <tr><td>YEAR</td>
+  <td>YEAR</td>
+  </tr>
+  <tr><td>QUARTER</td>
+  <td>QUARTER</td>
+  </tr>
+  <tr><td>MONTH</td>
+  <td>MONTH</td>
+  </tr>
+  <tr><td>WEEK</td>
+  <td>WEEK</td>
+  </tr>
+  <tr><td>DAY</td>
+  <td>DAY</td>
+  </tr>
+  <tr><td>HOUR</td>
+  <td>HOUR</td>
+  </tr>
+  <tr><td>MINUTE</td>
+  <td>MINUTE</td>
+  </tr>
+  <tr><td>SECOND</td>
+  <td>SECOND</td>
+  </tr>
+  <tr><td>YEAR_TO_MONTH</td>
+  <td>MICROSECOND</td>
+  </tr>
+  <tr><td>DAY_TO_HOUR</td>
+  <td>MILLISECOND</td>
+  </tr>
+  <tr><td><DAY_TO_MINUTE/td>
+  <td></td>
+  </tr>
+  <tr><td>DAY_TO_SECOND</td>
+  <td></td>
+  </tr>
+  <tr><td>HOUR_TO_MINUTE</td>
+  <td></td>
+  </tr>
+  <tr><td>HOUR_TO_SECOND</td>
+  <td></td>
+  </tr>
+  <tr><td>MINUTE_TO_SECOND</td>
+  <td></td>
+  </tr>
+  </tbody>
+</table>
+
 {% top %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index dfe69cb0411..eb4aad7e40a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange,
Unb
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval,
toMonthInterval, toRowInterval}
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
 
@@ -1104,6 +1105,34 @@ object dateFormat {
   }
 }
 
+/**
+ * Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2.
+ *
+ * For example <code>timestampDiff(TimeIntervalUnit.DAY, `2016-06-15`.toDate,
+ *  `2016-06-18`.toDate</code> results in integer as 3
+ */
+object timestampDiff {
+
+  /**
+    * Returns the (signed) number of timeUnit intervals between timestamp1 and timestamp2.
+    *
+    * For example timestampDiff(TimeIntervalUnit.DAY, "2016-06-15".toDate,
+    *  "2016-06-18".toDate results in integer as 3
+    *
+    * @param timeIntervalUnit The unit to compute diff.
+    * @param timestamp1 The first time,
+    * @param timestamp2 The second time,
+    * @return The number of intervals.
+    */
+  def apply(
+    timePointUnit: TimePointUnit,
+    timestamp1: Expression,
+    timestamp2: Expression
+  ): Expression = {
+    TimestampDiff(timePointUnit, timestamp1, timestamp2)
+  }
+}
+
 /**
   * Creates an array of literals. The array will be an array of objects (not primitives).
   */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 6cabe212213..c0f81a547f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -771,7 +771,7 @@ abstract class CodeGenerator(
         val right = operands(1)
         requireTemporal(left)
         requireTemporal(right)
-        generateTemporalPlusMinus(plus = true, nullCheck, left, right, config)
+        generateTemporalPlusMinus(plus = true, nullCheck, resultType, left, right, config)
 
       case MINUS if isNumeric(resultType) =>
         val left = operands.head
@@ -785,7 +785,7 @@ abstract class CodeGenerator(
         val right = operands(1)
         requireTemporal(left)
         requireTemporal(right)
-        generateTemporalPlusMinus(plus = false, nullCheck, left, right, config)
+        generateTemporalPlusMinus(plus = false, nullCheck, resultType, left, right, config)
 
       case MULTIPLY if isNumeric(resultType) =>
         val left = operands.head
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index a29259b8ad8..92f427300cb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.codegen.calls
 
 import java.lang.reflect.Method
 
+import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
@@ -507,6 +508,29 @@ object FunctionGenerator {
     Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.DATE),
     new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
 
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]),
+      SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]),
+      SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]),
+      SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE),
+    new TimestampDiffCallGen)
+
   addSqlFunction(
     FLOOR,
     Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 57f1618a2e3..cd8ea6174b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.codegen.calls
 import java.math.MathContext
 
 import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
-import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
+import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnit, TimeUnitRange}
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo._
@@ -825,6 +826,7 @@ object ScalarOperators {
   def generateTemporalPlusMinus(
       plus: Boolean,
       nullCheck: Boolean,
+      resultType: TypeInformation[_],
       left: GeneratedExpression,
       right: GeneratedExpression,
       config: TableConfig)
@@ -861,6 +863,47 @@ object ScalarOperators {
           (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
         }
 
+      case (l: SqlTimeTypeInfo[_], r: SqlTimeTypeInfo[_]) if !plus =>
+        resultType match {
+          case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+            generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+              (ll, rr) => (l, r) match {
+                case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+                  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+                    s"($ll, $rr * ${MILLIS_PER_DAY}L)"
+                case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+                  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+                    s"($ll * ${MILLIS_PER_DAY}L, $rr)"
+                case _ =>
+                  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}($ll, $rr)"
+               }
+            }
+
+          case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+            generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+              (ll, rr) => (l, r) match {
+                case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+                  s"$ll $op $rr"
+                case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) =>
+                  s"($ll * ${MILLIS_PER_DAY}L) $op ($rr * ${MILLIS_PER_DAY}L)"
+                case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+                  s"$ll $op ($rr * ${MILLIS_PER_DAY}L)"
+                case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+                  s"($ll * ${MILLIS_PER_DAY}L) $op $rr"
+              }
+            }
+
+          case SqlTimeTypeInfo.TIMESTAMP => // Timestamp arithmetic minus
+            generateOperatorIfNotNull(nullCheck, LONG_TYPE_INFO, left, right) {
+              (ll, rr) => s"(long)($ll $op $rr)"
+            }
+
+          case SqlTimeTypeInfo.DATE => // Date arithmetic minus
+            generateOperatorIfNotNull(nullCheck, INT_TYPE_INFO, left, right) {
+              (ll, rr) => s"(int)($ll $op $rr)"
+            }
+        }
+
       case _ =>
         throw new CodeGenException("Unsupported temporal arithmetic.")
     }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
new file mode 100644
index 00000000000..a8979ab3847
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+
+class TimestampDiffCallGen extends CallGenerator {
+
+  override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+    val unit = getEnum(operands.head).asInstanceOf[TimeUnit]
+    unit match {
+      case TimeUnit.YEAR |
+           TimeUnit.MONTH |
+           TimeUnit.QUARTER =>
+        (operands(1).resultType, operands(2).resultType) match {
+          case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(${terms(1)},
+                  |  ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()}
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(
+                  |${terms(1)} * ${MILLIS_PER_DAY}L, ${terms(2)}) / ${unit.multiplier.intValue()}
+                  |""".stripMargin
+            }  
+
+          case _ =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(${terms(1)},
+                  |  ${terms(2)}) / ${unit.multiplier.intValue()}
+                  |""".stripMargin
+            }
+        }   
+
+      case TimeUnit.WEEK |
+           TimeUnit.DAY |
+           TimeUnit.HOUR |
+           TimeUnit.MINUTE |
+           TimeUnit.SECOND =>
+        (operands(1).resultType, operands(2).resultType) match {
+          case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} - ${terms(2)}) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} -
+                  |  ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} * ${MILLIS_PER_DAY}L -
+                  |  ${terms(2)}) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) =>
+            return generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands)
{
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} * ${MILLIS_PER_DAY}L -
+                  |  ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+        }
+
+      case _ =>
+        throw new ValidationException(
+          "unit " + unit + " can not be applied to TimestampDiff function")
+     }            
+
+  }
+
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 4b2440cf673..f9033a7abc0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -55,6 +55,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
   lazy val TRIM: Keyword = Keyword("trim")
   lazy val EXTRACT: Keyword = Keyword("extract")
+  lazy val TIMESTAMPDIFF: Keyword = Keyword("timestampdiff")
   lazy val FLOOR: Keyword = Keyword("floor")
   lazy val CEIL: Keyword = Keyword("ceil")
   lazy val LOG: Keyword = Keyword("log")
@@ -369,6 +370,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers
{
       case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
     }
 
+  lazy val prefixTimestampDiff: PackratParser[Expression] =
+    TIMESTAMPDIFF ~ "(" ~ timePointUnit ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ unit ~ _ ~ operand1 ~ _ ~ operand2 ~ _ => TimestampDiff(unit, operand1,
operand2)
+    }
+
   lazy val prefixFloor: PackratParser[Expression] =
     FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
       case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
@@ -409,6 +415,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     prefixAs| prefixToTimestamp | prefixToTime | prefixToDate |
     // expressions that take enumerations
     prefixCast | prefixTrim | prefixTrimWithoutArgs | prefixExtract | prefixFloor | prefixCeil
|
+    prefixTimestampDiff |
     // expressions that take literals
     prefixGet |
     // expression with special identifier
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
index 28049e8570e..764679b1466 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.typeutils.TypeCoercion
 import org.apache.flink.table.validate._
 
 import scala.collection.JavaConversions._
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 
 abstract class BinaryArithmetic extends BinaryExpression {
   private[flink] def sqlOperator: SqlOperator
@@ -95,7 +96,7 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic
{
       ValidationFailure(
         s"$this requires Numeric, String, Intervals of same type, " +
         s"or Interval and a time point input, " +
-        s"get $left : ${left.resultType} and $right : ${right.resultType}")
+        s"but get $left : ${left.resultType} and $right : ${right.resultType}")
     }
   }
 }
@@ -132,8 +133,16 @@ case class Minus(left: Expression, right: Expression) extends BinaryArithmetic
{
       ValidationSuccess
     } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType))
{
       ValidationSuccess
+    } else if (isTimePoint(left.resultType) && left.resultType != SqlTimeTypeInfo.TIME
+        && left.resultType == right.resultType) {
+      ValidationSuccess
+    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      ValidationSuccess
     } else {
-      super.validateInput()
+      ValidationFailure(
+        s"$this requires Numeric, Intervals of same type, Interval and a time point input"
+
+        s"or time points input of the same timestamp or date, " +
+        s"but get $left : ${left.resultType} and $right : ${right.resultType}")
     }
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index ac996f6a101..3af8f53e0f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -18,9 +18,12 @@
 
 package org.apache.flink.table.expressions
 
-import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
@@ -340,3 +343,55 @@ case class DateFormat(timestamp: Expression, format: Expression) extends
Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+    timePointUnit: Expression,
+    timestamp1: Expression,
+    timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+    timePointUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+      return ValidationFailure(s"TimestampDiff operator requires Temporal input, " +
+        s"but timestamp1 is of type ${timestamp1.resultType}")
+    }
+
+    if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+      return ValidationFailure(s"TimestampDiff operator requires Temporal input, " +
+        s"but timestamp2 is of type ${timestamp2.resultType}")
+    }
+
+    timePointUnit match {
+      case SymbolExpression(TimePointUnit.YEAR)
+           | SymbolExpression(TimePointUnit.QUARTER)
+           | SymbolExpression(TimePointUnit.MONTH)
+           | SymbolExpression(TimePointUnit.WEEK)
+           | SymbolExpression(TimePointUnit.DAY)
+           | SymbolExpression(TimePointUnit.HOUR)
+           | SymbolExpression(TimePointUnit.MINUTE)
+           | SymbolExpression(TimePointUnit.SECOND)
+        if timestamp1.resultType == SqlTimeTypeInfo.DATE
+          || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || timestamp2.resultType == SqlTimeTypeInfo.DATE
+          || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+        ValidationSuccess
+
+      case _ =>
+        ValidationFailure(s"TimestampDiff operator does not support unit '$timePointUnit'"
+
+            s" for input of type ('${timestamp1.resultType}', '${timestamp2.resultType}').")
+    }
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+    .getRexBuilder
+    .makeCall(SqlStdOperatorTable.TIMESTAMP_DIFF,
+       Seq(timePointUnit.toRexNode, timestamp2.toRexNode, timestamp1.toRexNode))
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index f5d13305349..9e8a1a076fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -253,6 +253,7 @@ object FunctionCatalog {
     "temporalOverlaps" -> classOf[TemporalOverlaps],
     "dateTimePlus" -> classOf[Plus],
     "dateFormat" -> classOf[DateFormat],
+    "timestampDiff" -> classOf[TimestampDiff],
 
     // item
     "at" -> classOf[ItemAt],
@@ -442,6 +443,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.BIN,
     ScalarSqlFunctions.HEX,
     SqlStdOperatorTable.TIMESTAMP_ADD,
+    SqlStdOperatorTable.TIMESTAMP_DIFF,
     ScalarSqlFunctions.LOG,
     ScalarSqlFunctions.LPAD,
     ScalarSqlFunctions.RPAD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 145f3c5fba3..13a8c042b8d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -2069,6 +2069,139 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "false")
   }
 
+  @Test
+  def testTimestampDiff(): Unit = {
+    val dataMap = Map(
+      ("DAY", TimePointUnit.DAY, "SQL_TSI_DAY") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-05 11:11:11", "2"), //timestamp, timestamp
+        ("2016-06-15", "2016-06-16 11:11:11", "1"), //date, timestamp
+        ("2016-06-15 11:00:00", "2016-06-19", "3"), //timestamp, date
+        ("2016-06-15", "2016-06-18", "3") //date, date
+      ),
+      ("HOUR", TimePointUnit.HOUR, "SQL_TSI_HOUR") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-04 12:12:11", "25"),
+        ("2016-06-15", "2016-06-16 11:11:11", "35"),
+        ("2016-06-15 11:00:00", "2016-06-19", "85"),
+        ("2016-06-15", "2016-06-12", "-72")
+      ),
+      ("MINUTE", TimePointUnit.MINUTE, "SQL_TSI_MINUTE") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-03 12:10:11", "59"),
+        ("2016-06-15", "2016-06-16 11:11:11", "2111"),
+        ("2016-06-15 11:00:00", "2016-06-19", "5100"),
+        ("2016-06-15", "2016-06-18", "4320")
+      ),
+      ("SECOND", TimePointUnit.SECOND, "SQL_TSI_SECOND") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-03 11:12:12", "61"),
+        ("2016-06-15", "2016-06-16 11:11:11", "126671"),
+        ("2016-06-15 11:00:00", "2016-06-19", "306000"),
+        ("2016-06-15", "2016-06-18", "259200")
+      ),
+      ("WEEK", TimePointUnit.WEEK, "SQL_TSI_WEEK") -> Seq(
+        ("2018-05-03 11:11:11", "2018-07-03 11:12:12", "8"),
+        ("2016-04-15", "2016-07-16 11:11:11", "13"),
+        ("2016-04-15 11:00:00", "2016-09-19", "22"),
+        ("2016-08-15", "2016-06-18", "-8")
+      ),
+      ("MONTH", TimePointUnit.MONTH, "SQL_TSI_MONTH") -> Seq(
+        ("2018-07-03 11:11:11", "2018-09-05 11:11:11", "2"),
+        ("2016-06-15", "2018-06-16 11:11:11", "24"),
+        ("2016-06-15 11:00:00", "2018-05-19", "23"),
+        ("2016-06-15", "2018-03-18", "21")
+      ),
+      ("QUARTER", TimePointUnit.QUARTER, "SQL_TSI_QUARTER") -> Seq(
+        ("2018-01-03 11:11:11", "2018-09-05 11:11:11", "2"),
+        ("2016-06-15", "2018-06-16 11:11:11", "8"),
+        ("2016-06-15 11:00:00", "2018-05-19", "7"),
+        ("2016-06-15", "2018-03-18", "7")
+      ),
+      ("YEAR", TimePointUnit.YEAR, "SQL_TSI_YEAR") -> Seq(
+        ("2016-01-03 11:11:11", "2018-09-05 11:11:11", "2"),
+        ("2010-06-15", "2018-06-16 11:11:11", "8"),
+        ("2016-06-15 11:00:00", "2018-05-19", "1"),
+        ("2018-06-15", "2011-03-18", "-7")
+      )
+    )
+
+    for ((unitparts, dataparts) <- dataMap) {
+      for ((data,index) <- dataparts.zipWithIndex) {
+        index match {
+          case 0 => // timestamp, timestamp
+            testAllApis(
+              timestampDiff(unitparts._2, data._1.toTimestamp, data._2.toTimestamp),
+              s"timestampDiff(${unitparts._1}, '${data._1}'.toTimestamp, '${data._2}'.toTimestamp)",
+              s"TIMESTAMPDIFF(${unitparts._1}, TIMESTAMP '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+            testSqlApi(  // sql tsi
+              s"TIMESTAMPDIFF(${unitparts._3}, TIMESTAMP '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+          case 1 => // date, timestamp
+            testAllApis(
+              timestampDiff(unitparts._2, data._1.toDate, data._2.toTimestamp),
+              s"timestampDiff(${unitparts._1}, '${data._1}'.toDate, '${data._2}'.toTimestamp)",
+              s"TIMESTAMPDIFF(${unitparts._1}, DATE '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+            testSqlApi( // sql tsi
+              s"TIMESTAMPDIFF(${unitparts._3}, DATE '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+          case 2 => // timestamp, date
+            testAllApis(
+              timestampDiff(unitparts._2, data._1.toTimestamp, data._2.toDate),
+              s"timestampDiff(${unitparts._1}, '${data._1}'.toTimestamp, '${data._2}'.toDate)",
+              s"TIMESTAMPDIFF(${unitparts._1}, TIMESTAMP '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+            testSqlApi( // sql tsi
+              s"TIMESTAMPDIFF(${unitparts._3}, TIMESTAMP '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+          case 3 => // date, date
+            testAllApis(
+              timestampDiff(unitparts._2, data._1.toDate, data._2.toDate),
+              s"timestampDiff(${unitparts._1}, '${data._1}'.toDate, '${data._2}'.toDate)",
+              s"TIMESTAMPDIFF(${unitparts._1}, DATE '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+            testSqlApi( // sql tsi
+              s"TIMESTAMPDIFF(${unitparts._3}, DATE '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+        }
+      }
+    }
+
+    testTableApi(
+      "2018-07-05 11:11:11".toTimestamp - "2018-07-03 11:11:11".toTimestamp,
+      "'2018-07-05 11:11:11'.toTimestamp - '2018-07-03 11:11:11'.toTimestamp",
+      "172800000"
+    )
+
+    testTableApi(
+      "2018-07-05".toDate - "2018-07-03".toDate,
+      "'2018-07-05'.toDate - '2018-07-03'.toDate",
+      "2"
+    )
+
+    testAllApis(
+      timestampDiff(TimePointUnit.DAY, Null(Types.SQL_TIMESTAMP),
+        "2016-02-24 12:42:25".toTimestamp),
+      "timestampDiff(DAY, Null(SQL_TIMESTAMP), '2016-02-24 12:42:25'.toTimestamp)",
+      "TIMESTAMPDIFF(DAY, CAST(NULL AS TIMESTAMP), TIMESTAMP '2016-02-24 12:42:25')",
+      "null"
+    )
+
+    testAllApis(
+      timestampDiff(TimePointUnit.DAY, "2016-02-24 12:42:25".toTimestamp,
+        Null(Types.SQL_TIMESTAMP)),
+      "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp,  Null(SQL_TIMESTAMP))",
+      "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-02-24 12:42:25',  CAST(NULL AS TIMESTAMP))",
+      "null"
+    )
+  }
+
   @Test
   def testTimestampAdd(): Unit = {
     val data = Seq(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index 05e4e9bfc99..d35b48a8579 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.expressions.validation
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.flink.table.api.{SqlParserException, ValidationException}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.{TimeIntervalUnit, TimePointUnit}
 import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
@@ -98,6 +99,18 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
     testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16")
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testTimestampDiffWithWrongTime(): Unit = {
+    testTableApi(
+      timestampDiff(TimePointUnit.DAY, "2016-02-24", "2016-02-27"), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testTimestampDiffWithWrongTimeAndUnit(): Unit = {
+    testTableApi(
+      timestampDiff(TimePointUnit.MINUTE, "2016-02-24", "2016-02-27"), "FAIL", "FAIL")
+  }
+
   @Test(expected = classOf[ValidationException])
   def testDOWWithTimeWhichIsUnsupported(): Unit = {
     testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add TIMESTAMPDIFF supported in TableAPI
> ---------------------------------------
>
>                 Key: FLINK-6847
>                 URL: https://issues.apache.org/jira/browse/FLINK-6847
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>    Affects Versions: 1.4.0
>            Reporter: sunjincheng
>            Priority: Major
>              Labels: pull-request-available, starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message