Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 03E8B200C38 for ; Wed, 1 Mar 2017 00:43:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 02866160B7E; Tue, 28 Feb 2017 23:43:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 01254160B7C for ; Wed, 1 Mar 2017 00:43:06 +0100 (CET) Received: (qmail 21261 invoked by uid 500); 28 Feb 2017 23:43:06 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 21252 invoked by uid 99); 28 Feb 2017 23:43:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2017 23:43:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1932BDFDB1; Tue, 28 Feb 2017 23:43:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Message-Id: <02bf6db850bb4792a3895ad81bdc3dfe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5921] [table] Add custom data types for rowtime and proctime. Date: Tue, 28 Feb 2017 23:43:06 +0000 (UTC) archived-at: Tue, 28 Feb 2017 23:43:08 -0000 Repository: flink Updated Branches: refs/heads/master 3086af534 -> 2a1a9c1e3 [FLINK-5921] [table] Add custom data types for rowtime and proctime. - proctime() and rowtime() are translated to constont zero timestamp. This closes #3425. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a1a9c1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a1a9c1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a1a9c1e Branch: refs/heads/master Commit: 2a1a9c1e31faedb76c990a4f9405837600d770f8 Parents: 3086af5 Author: Fabian Hueske Authored: Sun Feb 26 00:28:54 2017 +0100 Committer: Fabian Hueske Committed: Wed Mar 1 00:42:19 2017 +0100 ---------------------------------------------------------------------- .../table/codegen/calls/FunctionGenerator.scala | 14 +++- .../functions/TimeModeIndicatorFunctions.scala | 80 +++++++++++++++++++- .../datastream/LogicalWindowAggregateRule.scala | 71 +++++++++-------- .../scala/stream/sql/WindowAggregateTest.scala | 8 +- 4 files changed, 135 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index dfc9055..7d55957 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -28,7 +28,9 @@ import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.GenericTypeInfo -import org.apache.flink.table.functions.utils.{TableSqlFunction, ScalarSqlFunction} +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} +import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor} +import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} import scala.collection.mutable @@ -327,6 +329,15 @@ object FunctionGenerator { ) ) + // generate a constant for time indicator functions. + // this is a temporary solution and will be removed when FLINK-5884 is implemented. + case ProcTimeExtractor | EventTimeExtractor => + Some(new CallGenerator { + override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = { + GeneratedExpression("0L", "false", "", SqlTimeTypeInfo.TIMESTAMP) + } + }) + // built-in scalar function case _ => sqlFunctions.get((sqlOperator, operandTypes)) @@ -336,6 +347,7 @@ object FunctionGenerator { case (x: BasicTypeInfo[_], y: BasicTypeInfo[_]) => y.shouldAutocastTo(x) || x == y case _ => false }).map(_._2)) + } // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala index b9b66ea..3ddcbdc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala @@ -17,8 +17,12 @@ */ package org.apache.flink.table.functions +import java.nio.charset.Charset +import java.util + +import org.apache.calcite.rel.`type`._ import org.apache.calcite.sql._ -import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName} +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName} import org.apache.calcite.sql.validate.SqlMonotonicity import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo @@ -26,7 +30,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.expressions.LeafExpression object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC, + ReturnTypes.explicit(TimeModeTypes.ROWTIME), null, OperandTypes.NILADIC, SqlFunctionCategory.SYSTEM) { override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION @@ -35,7 +39,7 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, } object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC, + ReturnTypes.explicit(TimeModeTypes.PROCTIME), null, OperandTypes.NILADIC, SqlFunctionCategory.SYSTEM) { override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION @@ -61,3 +65,73 @@ abstract class TimeIndicator extends LeafExpression { case class RowTime() extends TimeIndicator case class ProcTime() extends TimeIndicator + +object TimeModeTypes { + + // indicator data type for row time (event time) + val ROWTIME = new RowTimeType + // indicator data type for processing time + val PROCTIME = new ProcTimeType + +} + +class RowTimeType extends TimeModeType { + + override def toString(): String = "ROWTIME" + override def getFullTypeString: String = "ROWTIME_INDICATOR" +} + +class ProcTimeType extends TimeModeType { + + override def toString(): String = "PROCTIME" + override def getFullTypeString: String = "PROCTIME_INDICATOR" +} + +abstract class TimeModeType extends RelDataType { + + override def getComparability: RelDataTypeComparability = RelDataTypeComparability.NONE + + override def isStruct: Boolean = false + + override def getFieldList: util.List[RelDataTypeField] = null + + override def getFieldNames: util.List[String] = null + + override def getFieldCount: Int = 0 + + override def getStructKind: StructKind = StructKind.NONE + + override def getField( + fieldName: String, + caseSensitive: Boolean, + elideRecord: Boolean): RelDataTypeField = null + + override def isNullable: Boolean = false + + override def getComponentType: RelDataType = null + + override def getKeyType: RelDataType = null + + override def getValueType: RelDataType = null + + override def getCharset: Charset = null + + override def getCollation: SqlCollation = null + + override def getIntervalQualifier: SqlIntervalQualifier = null + + override def getPrecision: Int = -1 + + override def getScale: Int = -1 + + override def getSqlTypeName: SqlTypeName = SqlTypeName.TIMESTAMP + + override def getSqlIdentifier: SqlIdentifier = null + + override def getFamily: RelDataTypeFamily = SqlTypeFamily.NUMERIC + + override def getPrecedenceList: RelDataTypePrecedenceList = ??? + + override def isDynamicStruct: Boolean = false + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala index f5eb5f9..37a1b7d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala @@ -17,22 +17,19 @@ */ package org.apache.flink.table.plan.rules.datastream -import java.util.Calendar - import com.google.common.collect.ImmutableList import org.apache.calcite.avatica.util.TimeUnitRange import org.apache.calcite.plan._ import org.apache.calcite.plan.hep.HepRelVertex import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} -import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlFloorFunction import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.api.scala.Tumble -import org.apache.flink.table.api.{TableException, TumblingWindow, Window} +import org.apache.flink.table.api.{EventTimeWindow, TableException, TumblingWindow, Window} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor} +import org.apache.flink.table.functions.TimeModeTypes import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import org.apache.flink.table.typeutils.TimeIntervalTypeInfo @@ -68,7 +65,18 @@ class LogicalWindowAggregateRule val builder = call.builder() val rexBuilder = builder.getRexBuilder - val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3) + + // build dummy literal with type depending on time semantics + val zero = window match { + case _: EventTimeWindow => + rexBuilder.makeAbstractCast( + TimeModeTypes.ROWTIME, + rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true)) + case _ => + rexBuilder.makeAbstractCast( + TimeModeTypes.PROCTIME, + rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true)) + } val newAgg = builder .push(project.getInput) @@ -90,52 +98,58 @@ class LogicalWindowAggregateRule private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = { val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject] - val key = agg.getGroupSet.asList() - val fields = key.flatMap(x => nodeToMaybeWindow(project.getProjects.get(x)) match { - case Some(w) => Some(x.toInt, w) - case _ => None - }) - fields.size match { + val groupKeys = agg.getGroupSet + + // filter expressions on which is grouped + val groupExpr = project.getProjects.zipWithIndex.filter(p => groupKeys.get(p._2)) + + // check for window expressions in group expressions + val windowExpr = groupExpr + .map(g => (g._2, identifyWindow(g._1)) ) + .filter(_._2.isDefined) + .map(g => (g._1, g._2.get) ) + + windowExpr.size match { case 0 => None - case 1 => Some(fields.head) + case 1 => Some(windowExpr.head) case _ => throw new TableException("Multiple windows are not supported") } } - private def nodeToMaybeWindow(field: RexNode): Option[Window] = { + private def identifyWindow(field: RexNode): Option[Window] = { + // Detects window expressions by pattern matching + // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), + // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { case _: SqlFloorFunction => - val unit: TimeUnitRange = LogicalWindowAggregateRule.getLiteral(call.getOperands.get(1)) + val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] + val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) - return LogicalWindowAggregateRule.decorateTimeIndicator( - call.getOperands.get(0).asInstanceOf[RexCall].getOperator, w) + call.getType match { + case TimeModeTypes.PROCTIME => + return Some(w) + case TimeModeTypes.ROWTIME => + return Some(w.on("rowtime")) + case _ => + } case _ => } case _ => } None } + } object LogicalWindowAggregateRule { - private[flink] val TIMESTAMP_ZERO = Calendar.getInstance() - TIMESTAMP_ZERO.setTimeInMillis(0) private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) private[flink] val INSTANCE = new LogicalWindowAggregateRule - private def decorateTimeIndicator(operator: SqlOperator, window: TumblingWindow) = { - operator match { - case EventTimeExtractor => Some(window.on("rowtime")) - case ProcTimeExtractor => Some(window) - case _ => None - } - } - private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = { intervalToTumbleWindow(range.startUnit.multiplier.longValue()) } @@ -144,8 +158,5 @@ object LogicalWindowAggregateRule { Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS) } - private def getLiteral[T](node: RexNode): T = { - node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a1a9c1e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 06088ab..c1d39aa 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -40,7 +40,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "1970-01-01 00:00:00 AS $f0") + term("select", "CAST(1970-01-01 00:00:00) AS $f0") ), term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)), term("select", "COUNT(*) AS EXPR$0") @@ -61,7 +61,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "1970-01-01 00:00:00 AS $f1") + term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1") ), term("groupBy", "a"), term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)), @@ -83,7 +83,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c") + term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1, b, c") ), term("groupBy", "a, b"), term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)), @@ -105,7 +105,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "1970-01-01 00:00:00 AS $f0") + term("select", "CAST(1970-01-01 00:00:00) AS $f0") ), term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)), term("select", "COUNT(*) AS EXPR$0")