Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86C9A200CA9 for ; Fri, 12 May 2017 08:11:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 85516160BCA; Fri, 12 May 2017 06:11:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 401B8160BCC for ; Fri, 12 May 2017 08:11:04 +0200 (CEST) Received: (qmail 21218 invoked by uid 500); 12 May 2017 06:11:03 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 21126 invoked by uid 99); 12 May 2017 06:11:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 May 2017 06:11:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 868E3E7DA2; Fri, 12 May 2017 06:11:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 12 May 2017 06:11:06 -0000 Message-Id: <24d72589853c4fbfaaed6da283d76e32@git.apache.org> In-Reply-To: <8e8c202284624a74b330e9ed46e88cd4@git.apache.org> References: <8e8c202284624a74b330e9ed46e88cd4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] flink git commit: [FLINK-6483] [table] Add materialization of time indicators. archived-at: Fri, 12 May 2017 06:11:06 -0000 [FLINK-6483] [table] Add materialization of time indicators. This closes #3862. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b50ef4b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b50ef4b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b50ef4b8 Branch: refs/heads/master Commit: b50ef4b8de73e0e19df154d87ea588236e3ccb43 Parents: 2480887 Author: twalthr Authored: Wed May 10 10:11:34 2017 +0200 Committer: Fabian Hueske Committed: Fri May 12 08:09:54 2017 +0200 ---------------------------------------------------------------------- .../table/api/StreamTableEnvironment.scala | 39 +- .../calcite/RelTimeIndicatorConverter.scala | 404 +++++++++++++------ .../flink/table/codegen/CodeGenerator.scala | 47 ++- .../flink/table/plan/nodes/CommonCalc.scala | 13 +- .../table/plan/nodes/CommonCorrelate.scala | 19 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 6 +- .../plan/nodes/dataset/DataSetCorrelate.scala | 8 +- .../plan/nodes/datastream/DataStreamCalc.scala | 14 +- .../nodes/datastream/DataStreamCorrelate.scala | 23 +- .../datastream/StreamTableSourceScan.scala | 4 +- .../plan/nodes/logical/FlinkLogicalCalc.scala | 2 +- .../logical/FlinkLogicalTableSourceScan.scala | 6 +- .../DataStreamLogicalWindowAggregateRule.scala | 14 +- .../flink/table/plan/schema/RowSchema.scala | 11 +- .../plan/schema/StreamTableSourceTable.scala | 8 +- .../runtime/CRowCorrelateFlatMapRunner.scala | 83 ---- .../runtime/CRowCorrelateProcessRunner.scala | 91 +++++ .../flink/table/runtime/CRowFlatMapRunner.scala | 72 ---- .../flink/table/runtime/CRowProcessRunner.scala | 80 ++++ .../table/sources/DefinedTimeAttributes.scala | 60 --- .../table/sources/definedTimeAttributes.scala | 60 +++ .../stream/StreamTableEnvironmentTest.scala | 10 +- .../api/scala/stream/TableSourceTest.scala | 22 +- .../calcite/RelTimeIndicatorConverterTest.scala | 351 ++++++++++++++++ .../datastream/TimeAttributesITCase.scala | 237 +++++++++++ .../flink/table/utils/TableTestBase.scala | 5 + 26 files changed, 1267 insertions(+), 422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index d68da04..994ac80 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -18,38 +18,37 @@ package org.apache.flink.table.api -import _root_.java.util.concurrent.atomic.AtomicInteger import _root_.java.lang.{Boolean => JBool} +import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.plan.hep.HepMatchOrder -import org.apache.calcite.rel.{RelNode, RelVisitor} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelVisitor} import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} -import org.apache.flink.api.common.typeinfo.AtomicType -import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.calcite.RelTimeIndicatorConverter import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference} import org.apache.flink.table.plan.nodes.FlinkConventions -import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait} -import org.apache.flink.table.plan.nodes.datastream._ +import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable} -import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner} import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} -import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TypeCheckUtils import org.apache.flink.types.Row @@ -111,6 +110,17 @@ abstract class StreamTableEnvironment( override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { checkValidTableName(name) + // check if event-time is enabled + tableSource match { + case dra: DefinedRowtimeAttribute if + execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime => + + throw TableException( + s"A rowtime attribute requires an EventTime time characteristic in stream environment. " + + s"But is: ${execEnv.getStreamTimeCharacteristic}") + case _ => // ok + } + tableSource match { case streamTableSource: StreamTableSource[_] => registerTableInternal(name, new StreamTableSourceTable(streamTableSource)) @@ -390,6 +400,13 @@ abstract class StreamTableEnvironment( // validate and extract time attributes val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields) + // check if event-time is enabled + if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) { + throw TableException( + s"A rowtime attribute requires an EventTime time characteristic in stream environment. " + + s"But is: ${execEnv.getStreamTimeCharacteristic}") + } + val dataStreamTable = new DataStreamTable[T]( dataStream, fieldIndexes, @@ -518,9 +535,9 @@ abstract class StreamTableEnvironment( // 3. normalize the logical plan val normRuleSet = getNormRuleSet val normalizedPlan = if (normRuleSet.iterator().hasNext) { - runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet) + runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet) } else { - decorPlan + convPlan } // 4. optimize the logical Flink plan http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index fa2e3ee..7ceb397 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -18,73 +18,43 @@ package org.apache.flink.table.calcite -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType, StructKind} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core._ import org.apache.calcite.rel.logical._ -import org.apache.calcite.rel.{RelNode, RelShuttleImpl} +import org.apache.calcite.rel.{RelNode, RelShuttle} import org.apache.calcite.rex._ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType +import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.functions.TimeMaterializationSqlFunction import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import scala.collection.JavaConversions._ +import scala.collection.mutable /** * Traverses a [[RelNode]] tree and converts fields with [[TimeIndicatorRelDataType]] type. If a * time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed in * some cases, but not all. */ -class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl { +class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { - override def visit(project: LogicalProject): RelNode = { - // visit children and update inputs - val updatedProject = super.visit(project).asInstanceOf[LogicalProject] + private val timestamp = rexBuilder + .getTypeFactory + .asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) - // check if input field contains time indicator type - // materialize field if no time indicator is present anymore - // if input field is already materialized, change to timestamp type - val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - updatedProject.getInput.getRowType.getFieldList.map(_.getType)) - val newProjects = updatedProject.getProjects.map(_.accept(materializer)) - - // copy project - updatedProject.copy( - updatedProject.getTraitSet, - updatedProject.getInput, - newProjects, - buildRowType(updatedProject.getRowType.getFieldNames, newProjects.map(_.getType)) - ) - } - - override def visit(filter: LogicalFilter): RelNode = { - // visit children and update inputs - val updatedFilter = super.visit(filter).asInstanceOf[LogicalFilter] - - // check if input field contains time indicator type - // materialize field if no time indicator is present anymore - // if input field is already materialized, change to timestamp type - val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - updatedFilter.getInput.getRowType.getFieldList.map(_.getType)) - val newCondition = updatedFilter.getCondition.accept(materializer) - - // copy filter - updatedFilter.copy( - updatedFilter.getTraitSet, - updatedFilter.getInput, - newCondition - ) - } + override def visit(intersect: LogicalIntersect): RelNode = + throw new TableException("Logical intersect in a stream environment is not supported yet.") override def visit(union: LogicalUnion): RelNode = { // visit children and update inputs - val updatedUnion = super.visit(union).asInstanceOf[LogicalUnion] + val inputs = union.getInputs.map(_.accept(this)) // make sure that time indicator types match - val inputTypes = updatedUnion.getInputs.map(_.getRowType) + val inputTypes = inputs.map(_.getRowType) val head = inputTypes.head.getFieldList.map(_.getType) @@ -114,101 +84,269 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl { "Union fields with time attributes have different types.") } - updatedUnion + LogicalUnion.create(inputs, union.all) } + override def visit(aggregate: LogicalAggregate): RelNode = convertAggregate(aggregate) + + override def visit(minus: LogicalMinus): RelNode = + throw new TableException("Logical minus in a stream environment is not supported yet.") + + override def visit(sort: LogicalSort): RelNode = + throw new TableException("Logical sort in a stream environment is not supported yet.") + + override def visit(`match`: LogicalMatch): RelNode = + throw new TableException("Logical match in a stream environment is not supported yet.") + override def visit(other: RelNode): RelNode = other match { - case scan: LogicalTableFunctionScan if - stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] => + + case uncollect: Uncollect => // visit children and update inputs - val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan] - - val correlate = stack.peek().asInstanceOf[LogicalCorrelate] - - // check if input field contains time indicator type - // materialize field if no time indicator is present anymore - // if input field is already materialized, change to timestamp type - val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType)) - val newCall = updatedScan.getCall.accept(materializer) - - // copy scan - updatedScan.copy( - updatedScan.getTraitSet, - updatedScan.getInputs, - newCall, - updatedScan.getElementType, - updatedScan.getRowType, - updatedScan.getColumnMappings - ) + val input = uncollect.getInput.accept(this) + Uncollect.create(uncollect.getTraitSet, input, uncollect.withOrdinality) + + case scan: LogicalTableFunctionScan => + scan + + case aggregate: LogicalWindowAggregate => + val convAggregate = convertAggregate(aggregate) + + LogicalWindowAggregate.create( + aggregate.getWindow, + aggregate.getNamedProperties, + convAggregate) case _ => - super.visit(other) + throw new TableException(s"Unsupported logical operator: ${other.getClass.getSimpleName}") } - private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = { - val fields = names.zipWithIndex.map { case (name, idx) => - new RelDataTypeFieldImpl(name, idx, types(idx)) - } - new RelRecordType(StructKind.FULLY_QUALIFIED, fields) + + override def visit(exchange: LogicalExchange): RelNode = + throw new TableException("Logical exchange in a stream environment is not supported yet.") + + override def visit(scan: TableScan): RelNode = scan + + override def visit(scan: TableFunctionScan): RelNode = + throw new TableException("Table function scan in a stream environment is not supported yet.") + + override def visit(values: LogicalValues): RelNode = values + + override def visit(filter: LogicalFilter): RelNode = { + // visit children and update inputs + val input = filter.getInput.accept(this) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + input.getRowType.getFieldList.map(_.getType)) + + val condition = filter.getCondition.accept(materializer) + LogicalFilter.create(input, condition) } -} -class RexTimeIndicatorMaterializer( - private val rexBuilder: RexBuilder, - private val input: Seq[RelDataType]) - extends RexShuttle { - - val timestamp = rexBuilder - .getTypeFactory - .asInstanceOf[FlinkTypeFactory] - .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) - - override def visitInputRef(inputRef: RexInputRef): RexNode = { - // reference is interesting - if (isTimeIndicatorType(inputRef.getType)) { - val resolvedRefType = input(inputRef.getIndex) - // input is a valid time indicator - if (isTimeIndicatorType(resolvedRefType)) { - inputRef - } - // input has been materialized - else { - new RexInputRef(inputRef.getIndex, resolvedRefType) - } - } - // reference is a regular field - else { - super.visitInputRef(inputRef) + override def visit(project: LogicalProject): RelNode = { + // visit children and update inputs + val input = project.getInput.accept(this) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + input.getRowType.getFieldList.map(_.getType)) + + val projects = project.getProjects.map(_.accept(materializer)) + val fieldNames = project.getRowType.getFieldNames + LogicalProject.create(input, projects, fieldNames) + } + + override def visit(join: LogicalJoin): RelNode = + throw new TableException("Logical join in a stream environment is not supported yet.") + + override def visit(correlate: LogicalCorrelate): RelNode = { + // visit children and update inputs + val inputs = correlate.getInputs.map(_.accept(this)) + + val right = inputs(1) match { + case scan: LogicalTableFunctionScan => + // visit children and update inputs + val scanInputs = scan.getInputs.map(_.accept(this)) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + inputs.head.getRowType.getFieldList.map(_.getType)) + + val call = scan.getCall.accept(materializer) + LogicalTableFunctionScan.create( + scan.getCluster, + scanInputs, + call, + scan.getElementType, + scan.getRowType, + scan.getColumnMappings) + + case _ => + inputs(1) } + + LogicalCorrelate.create( + inputs.head, + right, + correlate.getCorrelationId, + correlate.getRequiredColumns, + correlate.getJoinType) } - override def visitCall(call: RexCall): RexNode = { - val updatedCall = super.visitCall(call).asInstanceOf[RexCall] + private def convertAggregate(aggregate: Aggregate): LogicalAggregate = { + // visit children and update inputs + val input = aggregate.getInput.accept(this) + + // add a project to materialize aggregation arguments/grouping keys + + val refIndices = mutable.Set[Int]() + + // check arguments of agg calls + aggregate.getAggCallList.foreach(call => if (call.getArgList.size() == 0) { + // count(*) has an empty argument list + (0 until input.getRowType.getFieldCount).foreach(refIndices.add) + } else { + // for other aggregations + call.getArgList.map(_.asInstanceOf[Int]).foreach(refIndices.add) + }) - // skip materialization for special operators - updatedCall.getOperator match { - case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE => - return updatedCall + // check grouping sets + aggregate.getGroupSets.foreach(set => + set.asList().map(_.asInstanceOf[Int]).foreach(refIndices.add) + ) - case _ => // do nothing + val needsMaterialization = refIndices.exists(idx => + isTimeIndicatorType(input.getRowType.getFieldList.get(idx).getType)) + + // create project if necessary + val projectedInput = if (needsMaterialization) { + + // insert or merge with input project if + // a time attribute is accessed and needs to be materialized + input match { + + // merge + case lp: LogicalProject => + val projects = lp.getProjects.zipWithIndex.map { case (expr, idx) => + if (isTimeIndicatorType(expr.getType) && refIndices.contains(idx)) { + rexBuilder.makeCall( + TimeMaterializationSqlFunction, + expr) + } else { + expr + } + } + + LogicalProject.create( + lp.getInput, + projects, + input.getRowType.getFieldNames) + + // new project + case _ => + val projects = input.getRowType.getFieldList.map { field => + if (isTimeIndicatorType(field.getType) && refIndices.contains(field.getIndex)) { + rexBuilder.makeCall( + TimeMaterializationSqlFunction, + new RexInputRef(field.getIndex, field.getType)) + } else { + new RexInputRef(field.getIndex, field.getType) + } + } + + LogicalProject.create( + input, + projects, + input.getRowType.getFieldNames) + } + } else { + // no project necessary + input } - // materialize operands with time indicators - val materializedOperands = updatedCall.getOperands.map { o => - if (isTimeIndicatorType(o.getType)) { - rexBuilder.makeCall(TimeMaterializationSqlFunction, o) + // remove time indicator type as agg call return type + val updatedAggCalls = aggregate.getAggCallList.map { call => + val callType = if (isTimeIndicatorType(call.getType)) { + timestamp } else { - o + call.getType } + AggregateCall.create( + call.getAggregation, + call.isDistinct, + call.getArgList, + call.filterArg, + callType, + call.name) } - // remove time indicator return type - if (isTimeIndicatorType(updatedCall.getType)) { - updatedCall.clone(timestamp, materializedOperands) - } else { - updatedCall.clone(updatedCall.getType, materializedOperands) + LogicalAggregate.create( + projectedInput, + aggregate.indicator, + aggregate.getGroupSet, + aggregate.getGroupSets, + updatedAggCalls) + } + + class RexTimeIndicatorMaterializer( + private val rexBuilder: RexBuilder, + private val input: Seq[RelDataType]) + extends RexShuttle { + + override def visitInputRef(inputRef: RexInputRef): RexNode = { + // reference is interesting + if (isTimeIndicatorType(inputRef.getType)) { + val resolvedRefType = input(inputRef.getIndex) + // input is a valid time indicator + if (isTimeIndicatorType(resolvedRefType)) { + inputRef + } + // input has been materialized + else { + new RexInputRef(inputRef.getIndex, resolvedRefType) + } + } + // reference is a regular field + else { + super.visitInputRef(inputRef) + } + } + + override def visitCall(call: RexCall): RexNode = { + val updatedCall = super.visitCall(call).asInstanceOf[RexCall] + + // materialize operands with time indicators + val materializedOperands = updatedCall.getOperator match { + + // skip materialization for special operators + case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE => + updatedCall.getOperands.toList + + case _ => + updatedCall.getOperands.map { o => + if (isTimeIndicatorType(o.getType)) { + rexBuilder.makeCall(TimeMaterializationSqlFunction, o) + } else { + o + } + } + } + + // remove time indicator return type + if (isTimeIndicatorType(updatedCall.getType)) { + updatedCall.clone(timestamp, materializedOperands) + } else { + updatedCall.clone(updatedCall.getType, materializedOperands) + } } } } @@ -217,6 +355,30 @@ object RelTimeIndicatorConverter { def convert(rootRel: RelNode, rexBuilder: RexBuilder): RelNode = { val converter = new RelTimeIndicatorConverter(rexBuilder) - rootRel.accept(converter) + val convertedRoot = rootRel.accept(converter) + + var needsConversion = false + + // materialize all remaining time indicators + val projects = convertedRoot.getRowType.getFieldList.map(field => + if (isTimeIndicatorType(field.getType)) { + needsConversion = true + rexBuilder.makeCall( + TimeMaterializationSqlFunction, + new RexInputRef(field.getIndex, field.getType)) + } else { + new RexInputRef(field.getIndex, field.getType) + } + ) + + // add final conversion + if (needsConversion) { + LogicalProject.create( + convertedRoot, + projects, + convertedRoot.getRowType.getFieldNames) + } else { + convertedRoot + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 25addbc..036889f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ @@ -238,6 +239,11 @@ class CodeGenerator( var outRecordTerm = "out" /** + * @return term of the [[ProcessFunction]]'s context + */ + var contextTerm = "ctx" + + /** * @return returns if null checking is enabled */ def nullCheck: Boolean = config.getNullCheck @@ -699,6 +705,17 @@ class CodeGenerator( List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) } + + // ProcessFunction + else if (clazz == classOf[ProcessFunction[_, _]]) { + val baseClass = classOf[ProcessFunction[_, _]] + val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + (baseClass, + s"void processElement(Object _in1, " + + s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," + + s"org.apache.flink.util.Collector $collectorTerm)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + } else { // TODO more functions throw new CodeGenException("Unsupported Function.") @@ -1312,9 +1329,11 @@ class CodeGenerator( throw new CodeGenException("Dynamic parameter references are not supported yet.") override def visitCall(call: RexCall): GeneratedExpression = { - // time materialization is not implemented yet + // special case: time materialization if (call.getOperator == TimeMaterializationSqlFunction) { - throw new CodeGenException("Access to time attributes is not possible yet.") + return generateRecordTimestamp( + FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType) + ) } val operands = call.getOperands.map(_.accept(this)) @@ -1840,6 +1859,30 @@ class CodeGenerator( } } + private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = { + val resultTerm = newName("result") + val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + + val resultCode = if (isEventTime) { + s""" + |$resultTypeTerm $resultTerm; + |if ($contextTerm.timestamp() == null) { + | throw new RuntimeException("Rowtime timestamp is null. Please make sure that a proper " + + | "TimestampAssigner is defined and the stream environment uses the EventTime time " + + | "characteristic."); + |} + |else { + | $resultTerm = $contextTerm.timestamp(); + |} + |""".stripMargin + } else { + s""" + |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime(); + |""".stripMargin + } + GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP) + } + // ---------------------------------------------------------------------------------------------- // Reusable code snippets // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index e875587..9b486e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.plan.{RelOptCost, RelOptPlanner} import org.apache.calcite.rex._ -import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.functions.Function import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} @@ -30,16 +30,17 @@ import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait CommonCalc[T] { +trait CommonCalc { - private[flink] def generateFunction( + private[flink] def generateFunction[T <: Function]( generator: CodeGenerator, ruleDescription: String, inputSchema: RowSchema, returnSchema: RowSchema, calcProgram: RexProgram, - config: TableConfig): - GeneratedFunction[FlatMapFunction[Row, Row], Row] = { + config: TableConfig, + functionClass: Class[T]): + GeneratedFunction[T, Row] = { val expandedExpressions = calcProgram .getProjectList @@ -92,7 +93,7 @@ trait CommonCalc[T] { generator.generateFunction( ruleDescription, - classOf[FlatMapFunction[Row, Row]], + functionClass, body, returnSchema.physicalTypeInfo) } http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index c95f2f7..874bea2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle} import org.apache.calcite.sql.SemiJoinType -import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.functions.{FlatMapFunction, Function} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue @@ -36,22 +36,22 @@ import scala.collection.JavaConverters._ /** * Join a user-defined table function */ -trait CommonCorrelate[T] { +trait CommonCorrelate { /** * Generates the flat map function to run the user-defined table function. */ - private[flink] def generateFunction( + private[flink] def generateFunction[T <: Function]( config: TableConfig, inputSchema: RowSchema, udtfTypeInfo: TypeInformation[Any], returnSchema: RowSchema, - rowType: RelDataType, joinType: SemiJoinType, rexCall: RexCall, pojoFieldMapping: Option[Array[Int]], - ruleDescription: String): - GeneratedFunction[FlatMapFunction[Row, Row], Row] = { + ruleDescription: String, + functionClass: Class[T]): + GeneratedFunction[T, Row] = { val functionGenerator = new CodeGenerator( config, @@ -89,7 +89,7 @@ trait CommonCorrelate[T] { val outerResultExpr = functionGenerator.generateResultExpression( input1AccessExprs ++ input2NullExprs, returnSchema.physicalTypeInfo, - rowType.getFieldNames.asScala) + returnSchema.physicalFieldNames) body += s""" |boolean hasOutput = $collectorTerm.isCollected(); @@ -104,7 +104,7 @@ trait CommonCorrelate[T] { functionGenerator.generateFunction( ruleDescription, - classOf[FlatMapFunction[Row, Row]], + functionClass, body, returnSchema.physicalTypeInfo) } @@ -117,7 +117,6 @@ trait CommonCorrelate[T] { inputSchema: RowSchema, udtfTypeInfo: TypeInformation[Any], returnSchema: RowSchema, - rowType: RelDataType, condition: Option[RexNode], pojoFieldMapping: Option[Array[Int]]) : GeneratedCollector = { @@ -135,7 +134,7 @@ trait CommonCorrelate[T] { val crossResultExpr = generator.generateResultExpression( input1AccessExprs ++ input2AccessExprs, returnSchema.physicalTypeInfo, - rowType.getFieldNames.asScala) + returnSchema.physicalFieldNames) val collectorCode = if (condition.isEmpty) { s""" http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index e340a8c..9a9f738 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -24,6 +24,7 @@ import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex._ +import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.BatchTableEnvironment @@ -46,7 +47,7 @@ class DataSetCalc( calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) - with CommonCalc[Row] + with CommonCalc with DataSetRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -95,7 +96,8 @@ class DataSetCalc( new RowSchema(getInput.getRowType), new RowSchema(getRowType), calcProgram, - config) + config, + classOf[FlatMapFunction[Row, Row]]) val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType) http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala index 49ead26..731d2e5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala @@ -23,6 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo @@ -49,7 +50,7 @@ class DataSetCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, inputNode) - with CommonCorrelate[Row] + with CommonCorrelate with DataSetRel { override def deriveRowType() = relRowType @@ -109,18 +110,17 @@ class DataSetCorrelate( new RowSchema(getInput.getRowType), udtfTypeInfo, new RowSchema(getRowType), - rowType, joinType, rexCall, pojoFieldMapping, - ruleDescription) + ruleDescription, + classOf[FlatMapFunction[Row, Row]]) val collector = generateCollector( config, new RowSchema(getInput.getRowType), udtfTypeInfo, new RowSchema(getRowType), - rowType, condition, pojoFieldMapping) http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 5f270f6..f75efc8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -25,11 +25,12 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexProgram import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} -import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc -import org.apache.flink.table.runtime.CRowFlatMapRunner +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** @@ -45,7 +46,7 @@ class DataStreamCalc( calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) - with CommonCalc[CRow] + with CommonCalc with DataStreamRel { override def deriveRowType(): RelDataType = schema.logicalType @@ -101,17 +102,18 @@ class DataStreamCalc( inputSchema, schema, calcProgram, - config) + config, + classOf[ProcessFunction[CRow, CRow]]) val inputParallelism = inputDataStream.getParallelism - val mapFunc = new CRowFlatMapRunner( + val processFunc = new CRowProcessRunner( genFunction.name, genFunction.code, CRowTypeInfo(schema.physicalTypeInfo)) inputDataStream - .flatMap(mapFunc) + .process(processFunc) .name(calcOpName(calcProgram, getExpressionString)) // keep parallelism to ensure order of accumulate and retract messages .setParallelism(inputParallelism) http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index 5b32b10..b7165cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -23,12 +23,13 @@ import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner +import org.apache.flink.table.runtime.CRowCorrelateProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** @@ -46,7 +47,7 @@ class DataStreamCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, input) - with CommonCorrelate[CRow] + with CommonCorrelate with DataStreamRel { override def deriveRowType() = schema.logicalType @@ -90,7 +91,6 @@ class DataStreamCorrelate( // we do not need to specify input type val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo] val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] @@ -98,37 +98,36 @@ class DataStreamCorrelate( val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping) val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - val flatMap = generateFunction( + val process = generateFunction( config, inputSchema, udtfTypeInfo, schema, - getRowType, joinType, rexCall, pojoFieldMapping, - ruleDescription) + ruleDescription, + classOf[ProcessFunction[CRow, CRow]]) val collector = generateCollector( config, inputSchema, udtfTypeInfo, schema, - getRowType, condition, pojoFieldMapping) - val mapFunc = new CRowCorrelateFlatMapRunner( - flatMap.name, - flatMap.code, + val processFunc = new CRowCorrelateProcessRunner( + process.name, + process.code, collector.name, collector.code, - CRowTypeInfo(flatMap.returnType)) + CRowTypeInfo(process.returnType)) val inputParallelism = inputDS.getParallelism inputDS - .flatMap(mapFunc) + .process(processFunc) // preserve input parallelism to ensure that acc and retract messages remain in order .setParallelism(inputParallelism) .name(correlateOpName(rexCall, sqlFunction, schema.logicalType)) http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 51e609f..72ecac5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -49,7 +49,7 @@ class StreamTableSourceScan( val fieldCnt = fieldNames.length val rowtime = tableSource match { - case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null => + case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute Some((fieldCnt, rowtimeAttribute)) case _ => @@ -57,7 +57,7 @@ class StreamTableSourceScan( } val proctime = tableSource match { - case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null => + case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala index 0ca079e..ec90392 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala @@ -34,7 +34,7 @@ class FlinkLogicalCalc( calcProgram: RexProgram) extends Calc(cluster, traitSet, input, calcProgram) with FlinkLogicalRel - with CommonCalc[Any] { + with CommonCalc { override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { new FlinkLogicalCalc(cluster, traitSet, child, program) http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index a2777ec..3ae949e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource} +import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource} import scala.collection.JavaConverters._ @@ -54,7 +54,7 @@ class FlinkLogicalTableSourceScan( val fieldCnt = fieldNames.length val rowtime = tableSource match { - case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null => + case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute Some((fieldCnt, rowtimeAttribute)) case _ => @@ -62,7 +62,7 @@ class FlinkLogicalTableSourceScan( } val proctime = tableSource match { - case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null => + case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 28efcf5..d57d4cc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.{TableException, Window} import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.{Literal, UnresolvedFieldReference} +import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, UnresolvedFieldReference} import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule import org.apache.flink.table.typeutils.TimeIntervalTypeInfo @@ -68,10 +68,12 @@ class DataStreamLogicalWindowAggregateRule case _ => throw new TableException("Only constant window descriptors are supported.") } - def getOperandAsTimeIndicator(call: RexCall, idx: Int): String = + def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference = call.getOperands.get(idx) match { case v: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(v.getType) => - rowType.getFieldList.get(v.getIndex).getName + ResolvedFieldReference( + rowType.getFieldList.get(v.getIndex).getName, + FlinkTypeFactory.toTypeInfo(v.getType)) case _ => throw new TableException("Window can only be defined over a time attribute column.") } @@ -82,7 +84,7 @@ class DataStreamLogicalWindowAggregateRule val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(UnresolvedFieldReference(time)).as("w$") + w.on(time).as("w$") case SqlStdOperatorTable.HOP => val time = getOperandAsTimeIndicator(windowExpr, 0) @@ -91,14 +93,14 @@ class DataStreamLogicalWindowAggregateRule .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(UnresolvedFieldReference(time)).as("w$") + w.on(time).as("w$") case SqlStdOperatorTable.SESSION => val time = getOperandAsTimeIndicator(windowExpr, 0) val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(UnresolvedFieldReference(time)).as("w$") + w.on(time).as("w$") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala index b42be82..ccbe44d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala @@ -20,11 +20,12 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType} import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.TimeMaterializationSqlFunction import org.apache.flink.types.Row import scala.collection.JavaConversions._ @@ -76,6 +77,14 @@ class RowSchema(private val logicalRowType: RelDataType) { override def visitInputRef(inputRef: RexInputRef): RexNode = { new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType) } + + override def visitCall(call: RexCall): RexNode = call.getOperator match { + // we leave time indicators unchanged yet + // the index becomes invalid but right now we are only + // interested in the type of the input reference + case TimeMaterializationSqlFunction => call + case _ => super.visitCall(call) + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala index 75deca5..fa15288 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala @@ -19,10 +19,10 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource} +import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource} class StreamTableSourceTable[T]( override val tableSource: TableSource[T], @@ -39,7 +39,7 @@ class StreamTableSourceTable[T]( val fieldCnt = fieldNames.length val rowtime = tableSource match { - case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null => + case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute Some((fieldCnt, rowtimeAttribute)) case _ => @@ -47,7 +47,7 @@ class StreamTableSourceTable[T]( } val proctime = tableSource match { - case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null => + case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala deleted file mode 100644 index ff3821a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} -import org.apache.flink.api.common.functions.util.FunctionUtils -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.codegen.Compiler -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.types.Row -import org.apache.flink.util.Collector -import org.slf4j.{Logger, LoggerFactory} - -/** - * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output. - */ -class CRowCorrelateFlatMapRunner( - flatMapName: String, - flatMapCode: String, - collectorName: String, - collectorCode: String, - @transient var returnType: TypeInformation[CRow]) - extends RichFlatMapFunction[CRow, CRow] - with ResultTypeQueryable[CRow] - with Compiler[Any] { - - val LOG: Logger = LoggerFactory.getLogger(this.getClass) - - private var function: FlatMapFunction[Row, Row] = _ - private var collector: TableFunctionCollector[_] = _ - private var cRowWrapper: CRowWrappingCollector = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) - LOG.debug("Instantiating TableFunctionCollector.") - collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] - this.cRowWrapper = new CRowWrappingCollector() - - LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode") - val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode) - val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]]) - LOG.debug("Instantiating FlatMapFunction.") - function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]] - FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) - FunctionUtils.openFunction(function, parameters) - } - - override def flatMap(in: CRow, out: Collector[CRow]): Unit = { - cRowWrapper.out = out - cRowWrapper.setChange(in.change) - - collector.setCollector(cRowWrapper) - collector.setInput(in.row) - collector.reset() - - function.flatMap(in.row, cRowWrapper) - } - - override def getProducedType: TypeInformation[CRow] = returnType - - override def close(): Unit = { - FunctionUtils.closeFunction(function) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala new file mode 100644 index 0000000..4f0a785 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.{Logger, LoggerFactory} + +/** + * A CorrelateProcessRunner with [[CRow]] input and [[CRow]] output. + */ +class CRowCorrelateProcessRunner( + processName: String, + processCode: String, + collectorName: String, + collectorCode: String, + @transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] + with ResultTypeQueryable[CRow] + with Compiler[Any] { + + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + private var function: ProcessFunction[Row, Row] = _ + private var collector: TableFunctionCollector[_] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) + LOG.debug("Instantiating TableFunctionCollector.") + collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] + this.cRowWrapper = new CRowWrappingCollector() + + LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode") + val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode) + val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]]) + LOG.debug("Instantiating ProcessFunction.") + function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]] + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + } + + override def processElement( + in: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]) + : Unit = { + + cRowWrapper.out = out + cRowWrapper.setChange(in.change) + + collector.setCollector(cRowWrapper) + collector.setInput(in.row) + collector.reset() + + function.processElement( + in.row, + ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], + cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType + + override def close(): Unit = { + FunctionUtils.closeFunction(function) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala deleted file mode 100644 index 9701cb9..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import org.apache.flink.api.common.functions.util.FunctionUtils -import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.codegen.Compiler -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.types.Row -import org.apache.flink.util.Collector -import org.slf4j.LoggerFactory - -/** - * FlatMapRunner with [[CRow]] input and [[CRow]] output. - */ -class CRowFlatMapRunner( - name: String, - code: String, - @transient var returnType: TypeInformation[CRow]) - extends RichFlatMapFunction[CRow, CRow] - with ResultTypeQueryable[CRow] - with Compiler[FlatMapFunction[Row, Row]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: FlatMapFunction[Row, Row] = _ - private var cRowWrapper: CRowWrappingCollector = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating FlatMapFunction.") - function = clazz.newInstance() - FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) - FunctionUtils.openFunction(function, parameters) - - this.cRowWrapper = new CRowWrappingCollector() - } - - override def flatMap(in: CRow, out: Collector[CRow]): Unit = { - cRowWrapper.out = out - cRowWrapper.setChange(in.change) - function.flatMap(in.row, cRowWrapper) - } - - override def getProducedType: TypeInformation[CRow] = returnType - - override def close(): Unit = { - FunctionUtils.closeFunction(function) - } -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala new file mode 100644 index 0000000..cef62a5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +/** + * ProcessRunner with [[CRow]] input and [[CRow]] output. + */ +class CRowProcessRunner( + name: String, + code: String, + @transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] + with ResultTypeQueryable[CRow] + with Compiler[ProcessFunction[Row, Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: ProcessFunction[Row, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating ProcessFunction.") + function = clazz.newInstance() + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + + this.cRowWrapper = new CRowWrappingCollector() + } + + override def processElement( + in: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]) + : Unit = { + + cRowWrapper.out = out + cRowWrapper.setChange(in.change) + function.processElement( + in.row, + ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], + cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType + + override def close(): Unit = { + FunctionUtils.closeFunction(function) + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala deleted file mode 100644 index 6d87663..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -/** - * Defines a logical event-time attribute for a [[TableSource]]. - * The event-time attribute can be used for indicating, accessing, and working with Flink's - * event-time. - * - * A [[TableSource]] that implements this interface defines the name of - * the event-time attribute. The attribute will be added to the schema of the - * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. - */ -trait DefinedRowTimeAttribute { - - /** - * Defines a name of the event-time attribute that represents Flink's - * event-time. Null if no rowtime should be available. - * - * The field will be appended to the schema provided by the [[TableSource]]. - */ - def getRowtimeAttribute: String -} - -/** - * Defines a logical processing-time attribute for a [[TableSource]]. - * The processing-time attribute can be used for indicating, accessing, and working with Flink's - * processing-time. - * - * A [[TableSource]] that implements this interface defines the name of - * the processing-time attribute. The attribute will be added to the schema of the - * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. - */ -trait DefinedProcTimeAttribute { - - /** - * Defines a name of the processing-time attribute that represents Flink's - * processing-time. Null if no rowtime should be available. - * - * The field will be appended to the schema provided by the [[TableSource]]. - */ - def getProctimeAttribute: String - -} http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala new file mode 100644 index 0000000..d381115 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Defines a logical event-time attribute for a [[TableSource]]. + * The event-time attribute can be used for indicating, accessing, and working with Flink's + * event-time. + * + * A [[TableSource]] that implements this interface defines the name of + * the event-time attribute. The attribute will be added to the schema of the + * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. + */ +trait DefinedRowtimeAttribute { + + /** + * Defines a name of the event-time attribute that represents Flink's + * event-time. Null if no rowtime should be available. + * + * The field will be appended to the schema provided by the [[TableSource]]. + */ + def getRowtimeAttribute: String +} + +/** + * Defines a logical processing-time attribute for a [[TableSource]]. + * The processing-time attribute can be used for indicating, accessing, and working with Flink's + * processing-time. + * + * A [[TableSource]] that implements this interface defines the name of + * the processing-time attribute. The attribute will be added to the schema of the + * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. + */ +trait DefinedProctimeAttribute { + + /** + * Defines a name of the processing-time attribute that represents Flink's + * processing-time. Null if no rowtime should be available. + * + * The field will be appended to the schema provided by the [[TableSource]]. + */ + def getProctimeAttribute: String + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala index e9384c7..7797f22 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala @@ -19,18 +19,16 @@ package org.apache.flink.table.api.scala.stream import java.lang.{Integer => JInt, Long => JLong} -import java.util.Collections -import java.util.{List => JList} import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv} -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.{TableEnvironment, TableException, Types} import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv} import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, TableException, Types} import org.apache.flink.table.utils.TableTestBase import org.junit.Test import org.mockito.Mockito.{mock, when} @@ -151,7 +149,9 @@ class StreamTableEnvironmentTest extends TableTestBase { private def prepareSchemaExpressionParser: (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = { - val jTEnv = TableEnvironment.getTableEnvironment(mock(classOf[JStreamExecEnv])) + val jStreamExecEnv = mock(classOf[JStreamExecEnv]) + when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime) + val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv) val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG) .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]] http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala index 18066c9..cda90f7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ -import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, StreamTableSource} +import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource} import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode} import org.apache.flink.types.Row @@ -35,7 +35,7 @@ class TableSourceTest extends TableTestBase { @Test def testRowTimeTableSourceSimple(): Unit = { val util = streamTestUtil() - util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime")) + util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime")) val t = util.tEnv.scan("rowTimeT").select("addTime, id, name, val") @@ -43,7 +43,7 @@ class TableSourceTest extends TableTestBase { unaryNode( "DataStreamCalc", "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])", - term("select", "addTime", "id", "name", "val") + term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", "name", "val") ) util.verifyTable(t, expected) } @@ -51,7 +51,7 @@ class TableSourceTest extends TableTestBase { @Test def testRowTimeTableSourceGroupWindow(): Unit = { val util = streamTestUtil() - util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime")) + util.tEnv.registerTableSource("rowTimeT", new TestRowtimeSource("addTime")) val t = util.tEnv.scan("rowTimeT") .filter("val > 100") @@ -82,7 +82,7 @@ class TableSourceTest extends TableTestBase { @Test def testProcTimeTableSourceSimple(): Unit = { val util = streamTestUtil() - util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime")) + util.tEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime")) val t = util.tEnv.scan("procTimeT").select("pTime, id, name, val") @@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase { unaryNode( "DataStreamCalc", "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])", - term("select", "pTime", "id", "name", "val") + term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", "val") ) util.verifyTable(t, expected) } @@ -98,7 +98,7 @@ class TableSourceTest extends TableTestBase { @Test def testProcTimeTableSourceOverWindow(): Unit = { val util = streamTestUtil() - util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime")) + util.tEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime")) val t = util.tEnv.scan("procTimeT") .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w) @@ -123,8 +123,8 @@ class TableSourceTest extends TableTestBase { } } -class TestRowTimeSource(timeField: String) - extends StreamTableSource[Row] with DefinedRowTimeAttribute { +class TestRowtimeSource(timeField: String) + extends StreamTableSource[Row] with DefinedRowtimeAttribute { override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ??? @@ -137,8 +137,8 @@ class TestRowTimeSource(timeField: String) } } -class TestProcTimeSource(timeField: String) - extends StreamTableSource[Row] with DefinedProcTimeAttribute { +class TestProctimeSource(timeField: String) + extends StreamTableSource[Row] with DefinedProctimeAttribute { override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???