Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36E05200C8E for ; Wed, 3 May 2017 14:10:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 35866160BB5; Wed, 3 May 2017 12:10:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB1D6160BD6 for ; Wed, 3 May 2017 14:10:12 +0200 (CEST) Received: (qmail 62154 invoked by uid 500); 3 May 2017 12:10:11 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 59866 invoked by uid 99); 3 May 2017 12:10:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 May 2017 12:10:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D14CE1103; Wed, 3 May 2017 12:10:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 03 May 2017 12:10:57 -0000 Message-Id: <942c0b70841a4017834b110bcb12a2ff@git.apache.org> In-Reply-To: <2de511ffe12d45ca85e8d3de85af107f@git.apache.org> References: <2de511ffe12d45ca85e8d3de85af107f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] flink git commit: [FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction. archived-at: Wed, 03 May 2017 12:10:15 -0000 [FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e265620d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e265620d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e265620d Branch: refs/heads/table-retraction Commit: e265620d44ca58c5f79eedb8644db14a1c99e874 Parents: 917f724 Author: Fabian Hueske Authored: Fri Apr 28 01:59:57 2017 +0200 Committer: Fabian Hueske Committed: Wed May 3 11:42:52 2017 +0200 ---------------------------------------------------------------------- .../connectors/kafka/KafkaTableSink.java | 6 +- .../flink/table/api/BatchTableEnvironment.scala | 2 +- .../table/api/StreamTableEnvironment.scala | 409 ++++++++++----- .../apache/flink/table/api/TableConfig.scala | 17 - .../flink/table/api/TableEnvironment.scala | 18 +- .../table/api/java/StreamTableEnvironment.scala | 82 ++- .../api/scala/StreamTableEnvironment.scala | 32 +- .../table/api/scala/TableConversions.scala | 19 + .../org/apache/flink/table/api/table.scala | 4 +- .../plan/nodes/datastream/DataStreamCalc.scala | 13 +- .../nodes/datastream/DataStreamCorrelate.scala | 8 +- .../datastream/DataStreamGroupAggregate.scala | 2 + .../DataStreamGroupWindowAggregate.scala | 4 + .../nodes/datastream/retractionTraits.scala | 37 +- .../datastream/DataStreamRetractionRules.scala | 16 +- .../runtime/CRowCorrelateFlatMapRunner.scala | 2 +- .../flink/table/runtime/CRowFlatMapRunner.scala | 2 +- .../table/runtime/CRowInputMapRunner.scala | 2 +- .../runtime/CRowInputTupleOutputMapRunner.scala | 53 +- .../table/runtime/CRowOutputMapRunner.scala | 2 +- .../table/runtime/CorrelateFlatMapRunner.scala | 2 +- .../flink/table/runtime/FlatJoinRunner.scala | 2 +- .../flink/table/runtime/FlatMapRunner.scala | 2 +- .../flink/table/runtime/MapJoinLeftRunner.scala | 2 +- .../table/runtime/MapJoinRightRunner.scala | 2 +- .../apache/flink/table/runtime/MapRunner.scala | 2 +- .../flink/table/runtime/MapSideJoinRunner.scala | 2 +- ...aSetSessionWindowAggregatePreProcessor.scala | 2 +- .../aggregate/GroupAggProcessFunction.scala | 55 +- .../runtime/io/CRowValuesInputFormat.scala | 2 +- .../table/runtime/io/ValuesInputFormat.scala | 2 +- .../table/sinks/AppendStreamTableSink.scala | 36 ++ .../apache/flink/table/sinks/CsvTableSink.scala | 100 +--- .../table/sinks/RetractStreamTableSink.scala | 55 ++ .../flink/table/sinks/StreamRetractSink.scala | 35 -- .../flink/table/sinks/StreamTableSink.scala | 32 -- .../table/sinks/UpsertStreamTableSink.scala | 79 +++ .../flink/table/TableEnvironmentTest.scala | 1 - .../api/scala/stream/RetractionITCase.scala | 106 ++-- .../api/scala/stream/TableSinkITCase.scala | 33 +- .../table/api/scala/stream/sql/SqlITCase.scala | 102 ++-- .../stream/table/GroupAggregationsITCase.scala | 61 +-- .../api/scala/stream/table/OverWindowTest.scala | 16 +- .../api/scala/stream/utils/StreamITCase.scala | 35 +- .../table/plan/rules/RetractionRulesTest.scala | 2 +- .../table/sinks/StreamTableSinksITCase.scala | 511 +++++++++++++++++++ .../table/utils/MockTableEnvironment.scala | 5 - .../flink/table/utils/TableTestBase.scala | 4 +- 48 files changed, 1414 insertions(+), 604 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 97f5fba..a8a2fd0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -29,12 +29,12 @@ import org.apache.flink.util.Preconditions; import java.util.Properties; /** - * A version-agnostic Kafka {@link StreamTableSink}. + * A version-agnostic Kafka {@link AppendStreamTableSink}. * *

The version-specific Kafka consumers need to extend this class and * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. */ -public abstract class KafkaTableSink implements StreamTableSink { +public abstract class KafkaTableSink implements AppendStreamTableSink { protected final String topic; protected final Properties properties; http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index d8d93b0..7f5becb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -138,7 +138,7 @@ abstract class BatchTableEnvironment( * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - override protected def getConversionMapper[IN, OUT]( + protected def getConversionMapper[IN, OUT]( physicalTypeInfo: TypeInformation[IN], logicalRowType: RelDataType, requestedTypeInfo: TypeInformation[OUT], http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 31ae78b..a839fcb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -19,28 +19,33 @@ package org.apache.flink.table.api import _root_.java.util.concurrent.atomic.AtomicInteger +import _root_.java.lang.{Boolean => JBool} import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.plan.hep.HepMatchOrder -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelNode, RelVisitor} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.Expression import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait} +import org.apache.flink.table.plan.nodes.datastream._ import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable} -import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner} +import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.types.Row @@ -162,26 +167,61 @@ abstract class StreamTableEnvironment( override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { sink match { - case streamSink: StreamTableSink[T] => + + case retractSink: RetractStreamTableSink[_] => + // retraction sink can always be used val outputType = sink.getOutputType // translate the Table into a DataStream and provide the type that the TableSink expects. - val result: DataStream[T] = translate(table)(outputType) - // Give the DataSet to the TableSink to emit it. - streamSink.emitDataStream(result) - - case streamRetractSink: StreamRetractSink[T] => + val result: DataStream[T] = + translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType) + // Give the DataStream to the TableSink to emit it. + retractSink.asInstanceOf[RetractStreamTableSink[Any]] + .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) + + case upsertSink: UpsertStreamTableSink[_] => + // optimize plan + val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false) + // check for append only table + val isAppendOnlyTable = isAppendOnly(optimizedPlan) + upsertSink.setIsAppendOnly(isAppendOnlyTable) + // extract unique key fields + val tableKeys: Option[Array[String]] = getUniqueKeyFields(optimizedPlan) + // check that we have keys if the table has changes (is not append-only) + tableKeys match { + case Some(keys) => upsertSink.setKeyFields(keys) + case None if isAppendOnlyTable => upsertSink.setKeyFields(null) + case None if !isAppendOnlyTable => throw new TableException( + "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + } val outputType = sink.getOutputType - this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction) // translate the Table into a DataStream and provide the type that the TableSink expects. - val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType) - // Give the DataSet to the TableSink to emit it. - streamRetractSink.emitDataStreamWithChange(result) + val result: DataStream[T] = + translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType) + // Give the DataStream to the TableSink to emit it. + upsertSink.asInstanceOf[UpsertStreamTableSink[Any]] + .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) + + case appendSink: AppendStreamTableSink[_] => + // optimize plan + val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false) + // verify table is an insert-only (append-only) table + if (!isAppendOnly(optimizedPlan)) { + throw new TableException( + "AppendStreamTableSink requires that Table has only insert changes.") + } + val outputType = sink.getOutputType + // translate the Table into a DataStream and provide the type that the TableSink expects. + val result: DataStream[T] = + translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType) + // Give the DataStream to the TableSink to emit it. + appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result) + case _ => - throw new TableException("StreamTableSink required to emit streaming Table") + throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, " + + "RetractStreamTableSink, or UpsertStreamTableSink.") } } - /** * Creates a final converter that maps the internal row type to external type. * @@ -191,23 +231,18 @@ abstract class StreamTableEnvironment( * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - override protected def getConversionMapper[IN, OUT]( + protected def getConversionMapper[IN, OUT]( physicalTypeInfo: TypeInformation[IN], logicalRowType: RelDataType, requestedTypeInfo: TypeInformation[OUT], functionName: String): - Option[MapFunction[IN, OUT]] = { + MapFunction[IN, OUT] = { - if (requestedTypeInfo.getTypeClass == classOf[CRow]) { - // only used to explain table - None - } else if (requestedTypeInfo.getTypeClass == classOf[Row]) { + if (requestedTypeInfo.getTypeClass == classOf[Row]) { // CRow to Row, only needs to be unwrapped - Some( - new MapFunction[CRow, Row] { - override def map(value: CRow): Row = value.row - }.asInstanceOf[MapFunction[IN, OUT]] - ) + new MapFunction[CRow, Row] { + override def map(value: CRow): Row = value.row + }.asInstanceOf[MapFunction[IN, OUT]] } else { // Some type that is neither CRow nor Row val converterFunction = generateRowConverterFunction[OUT]( @@ -217,42 +252,107 @@ abstract class StreamTableEnvironment( functionName ) - Some(new CRowInputMapRunner[OUT]( + new CRowInputMapRunner[OUT]( converterFunction.name, converterFunction.code, converterFunction.returnType) - .asInstanceOf[MapFunction[IN, OUT]]) + .asInstanceOf[MapFunction[IN, OUT]] } } + /** Validates that the plan produces only append changes. */ + protected def isAppendOnly(plan: RelNode): Boolean = { + val appendOnlyValidator = new AppendOnlyValidator + appendOnlyValidator.go(plan) + + appendOnlyValidator.isAppendOnly + } + + /** Extracts the unique keys of the table produced by the plan. */ + protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = { + val keyExtractor = new UniqueKeyExtractor + keyExtractor.go(plan) + keyExtractor.keys + } + /** - * Creates a final converter that maps the internal CRow type to external Tuple2 type. + * Creates a converter that maps the internal CRow type to Scala or Java Tuple2 with change flag. * * @param physicalTypeInfo the input of the sink * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) - * @param requestedTypeInfo the output type of the sink + * @param requestedTypeInfo the output type of the sink. * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - protected def getTupleConversionMapper[IN, OUT]( - physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[OUT], - functionName: String): - Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = { + private def getConversionMapperWithChanges[OUT]( + physicalTypeInfo: TypeInformation[CRow], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + MapFunction[CRow, OUT] = { + + requestedTypeInfo match { + + // Scala tuple + case t: CaseClassTypeInfo[_] + if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => + + val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]] + if (reqType.getTypeClass == classOf[Row]) { + // Requested type is Row. Just rewrap CRow in Tuple2 + new MapFunction[CRow, (Boolean, Row)] { + override def map(cRow: CRow): (Boolean, Row) = { + (cRow.change, cRow.row) + } + }.asInstanceOf[MapFunction[CRow, OUT]] + } else { + // Use a map function to convert Row into requested type and wrap result in Tuple2 + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + logicalRowType, + reqType, + functionName + ) + + new CRowInputScalaTupleOutputMapRunner( + converterFunction.name, + converterFunction.code, + requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]]) + .asInstanceOf[MapFunction[CRow, OUT]] - val converterFunction = generateRowConverterFunction( - physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, - logicalRowType, - requestedTypeInfo, - functionName - ) + } - Some(new CRowInputTupleOutputMapRunner[OUT]( - converterFunction.name, - converterFunction.code, - new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo)) - .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]]) + // Java tuple + case t: TupleTypeInfo[_] + if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => + + val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]] + if (reqType.getTypeClass == classOf[Row]) { + // Requested type is Row. Just rewrap CRow in Tuple2 + new MapFunction[CRow, JTuple2[JBool, Row]] { + val outT = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row]) + override def map(cRow: CRow): JTuple2[JBool, Row] = { + outT.f0 = cRow.change + outT.f1 = cRow.row + outT + } + }.asInstanceOf[MapFunction[CRow, OUT]] + } else { + // Use a map function to convert Row into requested type and wrap result in Tuple2 + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + logicalRowType, + reqType, + functionName + ) + + new CRowInputJavaTupleOutputMapRunner( + converterFunction.name, + converterFunction.code, + requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, Any]]]) + .asInstanceOf[MapFunction[CRow, OUT]] + } + } } /** @@ -338,9 +438,10 @@ abstract class StreamTableEnvironment( * Generates the optimized [[RelNode]] tree from the original relational node tree. * * @param relNode The root node of the relational expression tree. + * @param updatesAsRetraction True if the sink requests updates as retraction messages. * @return The optimized [[RelNode]] tree */ - private[flink] def optimize(relNode: RelNode): RelNode = { + private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { // 1. decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) @@ -365,7 +466,7 @@ abstract class StreamTableEnvironment( // 4. optimize the physical Flink plan val physicalOptRuleSet = getPhysicalOptRuleSet val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() - var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { + val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) } else { logicalPlan @@ -374,13 +475,18 @@ abstract class StreamTableEnvironment( // 5. decorate the optimized plan val decoRuleSet = getDecoRuleSet val decoratedPlan = if (decoRuleSet.iterator().hasNext) { - - if (this.config.getNeedsUpdatesAsRetractionForSink) { - physicalPlan = physicalPlan.copy( + val planToDecorate = if (updatesAsRetraction) { + physicalPlan.copy( physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)), physicalPlan.getInputs) + } else { + physicalPlan } - runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet) + runHepPlanner( + HepMatchOrder.BOTTOM_UP, + decoRuleSet, + planToDecorate, + planToDecorate.getTraitSet) } else { physicalPlan } @@ -395,14 +501,17 @@ abstract class StreamTableEnvironment( * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. * * @param table The root node of the relational expression tree. + * @param updatesAsRetraction Set to true to encode updates as retraction messages. + * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { + protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean) + (implicit tpe: TypeInformation[A]): DataStream[A] = { val relNode = table.getRelNode - val dataStreamPlan = optimize(relNode) - translate(dataStreamPlan, relNode.getRowType) + val dataStreamPlan = optimize(relNode, updatesAsRetraction) + translate(dataStreamPlan, relNode.getRowType, withChangeFlag) } /** @@ -411,87 +520,57 @@ abstract class StreamTableEnvironment( * @param logicalPlan The root node of the relational expression tree. * @param logicalType The row type of the result. Since the logicalPlan can lose the * field naming during optimization we pass the row type separately. + * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ protected def translate[A]( logicalPlan: RelNode, - logicalType: RelDataType) + logicalType: RelDataType, + withChangeFlag: Boolean) (implicit tpe: TypeInformation[A]): DataStream[A] = { - TableEnvironment.validateType(tpe) + // if no change flags are requested, verify table is an insert-only (append-only) table. + if (!withChangeFlag && !isAppendOnly(logicalPlan)) { + throw new TableException( + "Table is not an append-only table. " + + "Output needs to handle update and delete changes.") + } - logicalPlan match { - case node: DataStreamRel => - val plan = node.translateToPlan(this) - val conversion = - getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") - conversion match { - case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary - case Some(mapFunction: MapFunction[CRow, A]) => - plan.map(mapFunction) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .asInstanceOf[DataStream[A]] - } + // get CRow plan + val plan: DataStream[CRow] = translateToCRow(logicalPlan) - case _ => - throw TableException("Cannot generate DataStream due to an invalid logical plan. " + - "This is a bug and should not happen. Please file an issue.") + // convert CRow to output type + val conversion = if (withChangeFlag) { + getConversionMapperWithChanges(plan.getType, logicalType, tpe, "DataStreamSinkConversion") + } else { + getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") } - } - /** - * Translates a [[Table]] into a [[DataStream]] with change information. - * - * The transformation involves optimizing the relational expression tree as defined by - * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. - * - * @param table The root node of the relational expression tree. - * @param wrapToTuple True, if want to output chang information - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. - */ - protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]) - : DataStream[JTuple2[Boolean, A]] = { - val relNode = table.getRelNode - val dataStreamPlan = optimize(relNode) - translate(dataStreamPlan, relNode.getRowType, wrapToTuple) + val rootParallelism = plan.getParallelism + + conversion match { + case mapFunction: MapFunction[CRow, A] => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .setParallelism(rootParallelism) + } } /** - * Translates a logical [[RelNode]] into a [[DataStream]] with change information. + * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]]. * - * @param logicalPlan The root node of the relational expression tree. - * @param logicalType The row type of the result. Since the logicalPlan can lose the - * @param wrapToTuple True, if want to output chang information - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. + * @param logicalPlan The logical plan to translate. + * @return The [[DataStream]] of type [[CRow]]. */ - protected def translate[A]( - logicalPlan: RelNode, - logicalType: RelDataType, - wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = { - - TableEnvironment.validateType(tpe) + protected def translateToCRow( + logicalPlan: RelNode): DataStream[CRow] = { logicalPlan match { case node: DataStreamRel => - val plan = node.translateToPlan(this) - val conversion = - getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") - conversion match { - case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary - case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) => - plan.map(mapFunction) - .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe)) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .asInstanceOf[DataStream[JTuple2[Boolean, A]]] - } - + node.translateToPlan(this) case _ => throw TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") @@ -506,10 +585,8 @@ abstract class StreamTableEnvironment( */ def explain(table: Table): String = { val ast = table.getRelNode - val optimizedPlan = optimize(ast) - val dataStream = translate[CRow]( - optimizedPlan, - ast.getRowType)(new GenericTypeInfo(classOf[CRow])) + val optimizedPlan = optimize(ast, updatesAsRetraction = false) + val dataStream = translateToCRow(optimizedPlan) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan @@ -529,4 +606,90 @@ abstract class StreamTableEnvironment( s"$sqlPlan" } + private class AppendOnlyValidator extends RelVisitor { + + var isAppendOnly = true + + override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { + node match { + case s: DataStreamRel if s.producesUpdates => + isAppendOnly = false + case _ => + super.visit(node, ordinal, parent) + } + } + } + + /** Identifies unique key fields in the output of a RelNode. */ + private class UniqueKeyExtractor extends RelVisitor { + + var keys: Option[Array[String]] = None + + override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { + node match { + case c: DataStreamCalc => + super.visit(node, ordinal, parent) + // check if input has keys + if (keys.isDefined) { + // track keys forward + val inNames = c.getInput.getRowType.getFieldNames + val inOutNames = c.getProgram.getNamedProjects.asScala + .map(p => { + c.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case i: RexInputRef => (i.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => + a.getOperands.get(0) match { + case ref: RexInputRef => + (ref.getIndex, p.right) + case _ => + (-1, p.right) + } + // output field is not forwarded from input + case _: RexNode => (-1, p.right) + } + }) + // filter all non-forwarded fields + .filter(_._1 >= 0) + // resolve names of input fields + .map(io => (inNames.get(io._1), io._2)) + + // filter by input keys + val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2) + // check if all keys have been preserved + if (outKeys.nonEmpty && outKeys.length == keys.get.length) { + // all key have been preserved (but possibly renamed) + keys = Some(outKeys.toArray) + } else { + // some (or all) keys have been removed. Keys are no longer unique and removed + keys = None + } + } + case _: DataStreamOverAggregate => + super.visit(node, ordinal, parent) + // keys are always forwarded by Over aggregate + case a: DataStreamGroupAggregate => + // get grouping keys + val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length) + keys = Some(groupKeys.toArray) + case w: DataStreamGroupWindowAggregate => + // get grouping keys + val groupKeys = + w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray + // get window start and end time + val windowStartEnd = w.getWindowProperties.map(_.name) + // we have only a unique key if at least one window property is selected + if (windowStartEnd.nonEmpty) { + keys = Some(groupKeys ++ windowStartEnd) + } + case _: DataStreamRel => + // anything else does not forward keys or might duplicate key, so we can stop + keys = None + } + } + + } + } + http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala index d296978..6448657 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -37,11 +37,6 @@ class TableConfig { private var nullCheck: Boolean = true /** - * Defines whether sink table requires that update and delete changes are sent with retraction - */ - private var needsUpdatesAsRetractionForSink: Boolean = false - - /** * Defines the configuration of Calcite for Table API and SQL queries. */ private var calciteConfig = CalciteConfig.DEFAULT @@ -72,18 +67,6 @@ class TableConfig { } /** - * Returns the need retraction property for table sink. - */ - def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink - - /** - * Set the need retraction property for table sink. - */ - def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = { - this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction - } - - /** * Returns the current configuration of Calcite for Table API and SQL queries. */ def getCalciteConfig: CalciteConfig = calciteConfig http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index ad7e3b5..28db97e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -595,7 +595,7 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException( "An input of GenericTypeInfo cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") - case a: AtomicType[A] => + case a: AtomicType[_] => if (exprs.length != 1) { throw new TableException("Table of atomic type can only have a single field.") } @@ -657,22 +657,6 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } - /** - * Creates a final converter that maps the internal row type to external type. - * - * @param physicalTypeInfo the input of the sink - * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) - * @param requestedTypeInfo the output type of the sink - * @param functionName name of the map function. Must not be unique but has to be a - * valid Java class identifier. - */ - protected def getConversionMapper[IN, OUT]( - physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[OUT], - functionName: String): - Option[MapFunction[IN, OUT]] - protected def generateRowConverterFunction[OUT]( inputTypeInfo: TypeInformation[Row], logicalRowType: RelDataType, http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index 4d9f1e1..6ba51c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.table.api._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import _root_.java.lang.{Boolean => JBool} /** * The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]]. @@ -132,7 +134,10 @@ class StreamTableEnvironment( } /** - * Converts the given [[Table]] into a [[DataStream]] of a specified type. + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] @@ -145,11 +150,16 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - translate[T](table)(TypeExtractor.createTypeInfo(clazz)) + val typeInfo = TypeExtractor.createTypeInfo(clazz) + TableEnvironment.validateType(typeInfo) + translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) } /** - * Converts the given [[Table]] into a [[DataStream]] of a specified type. + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] @@ -162,7 +172,68 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { - translate[T](table)(typeInfo) + TableEnvironment.validateType(typeInfo) + translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message. + * + * The fields of the [[Table]] are mapped to the requested type as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the requested record type. + * @tparam T The type of the requested record type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T](table: Table, clazz: Class[T]): + DataStream[JTuple2[JBool, T]] = { + + val typeInfo = TypeExtractor.createTypeInfo(clazz) + TableEnvironment.validateType(typeInfo) + val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo) + translate[JTuple2[JBool, T]]( + table, + updatesAsRetraction = true, + withChangeFlag = true)(resultType) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message. + * + * The fields of the [[Table]] are mapped to the requested type as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] of the requested record type. + * @tparam T The type of the requested record type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]): + DataStream[JTuple2[JBool, T]] = { + + TableEnvironment.validateType(typeInfo) + val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]]( + Types.BOOLEAN, + typeInfo + ) + translate[JTuple2[JBool, T]]( + table, + updatesAsRetraction = true, + withChangeFlag = true)(resultTypeInfo) } /** @@ -180,4 +251,5 @@ class StreamTableEnvironment( registerTableFunctionInternal[T](name, tf) } + } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 0113146..e8ecb6f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -17,8 +17,9 @@ */ package org.apache.flink.table.api.scala +import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{TableEnvironment, Table, TableConfig} +import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.expressions.Expression import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} @@ -127,11 +128,14 @@ class StreamTableEnvironment( } /** - * Converts the given [[Table]] into a [[DataStream]] of a specified type. + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * * @param table The [[Table]] to convert. @@ -139,7 +143,24 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T: TypeInformation](table: Table): DataStream[T] = { - asScalaStream(translate(table)) + val returnType = createTypeInformation[T] + asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType)) + } + +/** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. + * + * @param table The [[Table]] to convert. + * @tparam T The type of the requested data type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { + val returnType = createTypeInformation[(Boolean, T)] + asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType)) } /** @@ -152,4 +173,5 @@ class StreamTableEnvironment( def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = { registerTableFunctionInternal(name, tf) } + } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 2a0d571..5efff62 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -57,5 +57,24 @@ class TableConversions(table: Table) { } } + /** Converts the [[Table]] to a [[DataStream]] of add and retract messages. + * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. + * + */ + def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = { + + table.tableEnv match { + case tEnv: ScalaStreamTableEnv => + tEnv.toRetractStream(table) + case _ => + throw new TableException( + "Only tables that originate from Scala DataStreams " + + "can be converted to Scala DataStreams.") + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 0953611..ec160dc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -767,7 +767,9 @@ class Table( * * A batch [[Table]] can only be written to a * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a - * [[org.apache.flink.table.sinks.StreamTableSink]]. + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. * * @param sink The [[TableSink]] to which the [[Table]] is written. * @tparam T The data type that the [[TableSink]] expects. http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 0bf723d..7dab44e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -24,16 +24,13 @@ import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexProgram -import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc import org.apache.flink.table.runtime.CRowFlatMapRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.types.Row /** * Flink RelNode which matches along with FlatMapOperator. @@ -96,11 +93,17 @@ class DataStreamCalc( calcProgram, config) + val inputParallelism = inputDataStream.getParallelism + val mapFunc = new CRowFlatMapRunner( genFunction.name, genFunction.code, CRowTypeInfo(outputRowType)) - inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) + inputDataStream + .flatMap(mapFunc) + .name(calcOpName(calcProgram, getExpressionString)) + // keep parallelism to ensure order of accumulate and retract messages + .setParallelism(inputParallelism) } } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index dff5099..c2e8a00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -125,7 +125,13 @@ class DataStreamCorrelate( collector.code, CRowTypeInfo(flatMap.returnType)) - inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) + val inputParallelism = inputDS.getParallelism + + inputDS + .flatMap(mapFunc) + // preserve input parallelism to ensure that acc and retract messages remain in order + .setParallelism(inputParallelism) + .name(correlateOpName(rexCall, sqlFunction, relRowType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 14ae343..8bff2a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -65,6 +65,8 @@ class DataStreamGroupAggregate( override def consumesRetractions = true + def getGroupings: Array[Int] = groupings + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamGroupAggregate( cluster, http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index d503792..2f9e79e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -60,6 +60,10 @@ class DataStreamGroupWindowAggregate( override def consumesRetractions = true + def getGroupings: Array[Int] = grouping + + def getWindowProperties: Seq[NamedWindowProperty] = namedProperties + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamGroupWindowAggregate( window, http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala index c3b43ba..173b7d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala @@ -82,19 +82,38 @@ object AccModeTrait { } /** - * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might - * produce. - * In [[AccMode.Acc]] the node only emit accumulate messages. - * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes, - * retraction messages for delete changes, and accumulate and retraction messages - * for update changes. + * The [[AccMode]] determines how insert, update, and delete changes of tables are encoded + * by the messeages that an operator emits. */ object AccMode extends Enumeration { type AccMode = Value - val Acc = Value // Operator produces only accumulate (insert) messages - val AccRetract = Value // Operator produces accumulate (insert, update) and - // retraction (delete, update) messages + /** + * An operator in [[Acc]] mode emits change messages as + * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row). + * + * An operator in [[Acc]] mode may only produce update and delete messages, if the table has + * a unique key and all key attributes are contained in the Row. + * + * Changes are encoded as follows: + * - insert: (true, NewRow) + * - update: (true, NewRow) // the Row includes the full unique key to identify the row to update + * - delete: (false, OldRow) // the Row includes the full unique key to idenify the row to delete + * + */ + val Acc = Value + + /** + * * An operator in [[AccRetract]] mode emits change messages as + * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row). + * + * Changes are encoded as follows: + * - insert: (true, NewRow) + * - update: (false, OldRow), (true, NewRow) // updates are encoded in two messages! + * - delete: (false, OldRow) + * + */ + val AccRetract = Value } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala index 97c0dbb..f0b725d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala @@ -102,17 +102,17 @@ object DataStreamRetractionRules { val rel = call.rel(0).asInstanceOf[DataStreamRel] val traits = rel.getTraitSet - val traitsWithUpdateAsRetrac = + val traitsWithUpdateAsRetraction = if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) { traits.plus(UpdateAsRetractionTrait.DEFAULT) } else { traits } val traitsWithAccMode = - if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) { - traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT) + if (null == traitsWithUpdateAsRetraction.getTrait(AccModeTraitDef.INSTANCE)) { + traitsWithUpdateAsRetraction.plus(AccModeTrait.DEFAULT) } else { - traitsWithUpdateAsRetrac + traitsWithUpdateAsRetraction } if (traits != traitsWithAccMode) { @@ -122,8 +122,8 @@ object DataStreamRetractionRules { } /** - * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete - * changes as retraction messages. + * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update changes with + * retraction messages. */ class SetUpdatesAsRetractionRule extends RelOptRule( operand( @@ -131,7 +131,7 @@ object DataStreamRetractionRules { "SetUpdatesAsRetractionRule") { /** - * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction + * Checks if a [[RelNode]] requires that update changes are sent with retraction * messages. */ def needsUpdatesAsRetraction(node: RelNode): Boolean = { @@ -142,7 +142,7 @@ object DataStreamRetractionRules { } /** - * Annotates a [[RelNode]] to send out update and delete changes as retraction messages. + * Annotates a [[RelNode]] to send out update changes with retraction messages. */ def setUpdatesAsRetraction(relNode: RelNode): RelNode = { val traitSet = relNode.getTraitSet http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala index 66e51b1..ff3821a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala @@ -37,7 +37,7 @@ class CRowCorrelateFlatMapRunner( flatMapCode: String, collectorName: String, collectorCode: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends RichFlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[Any] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala index 9a4650b..9701cb9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory class CRowFlatMapRunner( name: String, code: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends RichFlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[FlatMapFunction[Row, Row]] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala index 8e95c93..109c6e1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory class CRowInputMapRunner[OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichMapFunction[CRow, OUT] with ResultTypeQueryable[OUT] with Compiler[MapFunction[Row, OUT]] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala index 54bbf7e..7c96437 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.runtime +import java.lang.{Boolean => JBool} + import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable @@ -28,36 +30,63 @@ import org.apache.flink.types.Row import org.slf4j.LoggerFactory import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} - /** - * Convert [[CRow]] to a [[Tuple2]] + * Convert [[CRow]] to a [[JTuple2]] */ -class CRowInputTupleOutputMapRunner[OUT]( +class CRowInputJavaTupleOutputMapRunner( name: String, code: String, - @transient returnType: TypeInformation[JTuple2[Boolean, OUT]]) - extends RichMapFunction[CRow, JTuple2[Boolean, OUT]] - with ResultTypeQueryable[JTuple2[Boolean, OUT]] - with Compiler[MapFunction[Row, OUT]] { + @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) + extends RichMapFunction[CRow, Any] + with ResultTypeQueryable[JTuple2[JBool, Any]] + with Compiler[MapFunction[Row, Any]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Row, OUT] = _ - private var tupleWrapper: JTuple2[Boolean, OUT] = _ + private var function: MapFunction[Row, Any] = _ + private var tupleWrapper: JTuple2[JBool, Any] = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating MapFunction.") function = clazz.newInstance() - tupleWrapper = new JTuple2[Boolean, OUT]() + tupleWrapper = new JTuple2[JBool, Any]() } - override def map(in: CRow): JTuple2[Boolean, OUT] = { + override def map(in: CRow): JTuple2[JBool, Any] = { tupleWrapper.f0 = in.change tupleWrapper.f1 = function.map(in.row) tupleWrapper } - override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType + override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType +} + +/** + * Convert [[CRow]] to a [[Tuple2]] + */ +class CRowInputScalaTupleOutputMapRunner( + name: String, + code: String, + @transient var returnType: TypeInformation[(Boolean, Any)]) + extends RichMapFunction[CRow, (Boolean, Any)] + with ResultTypeQueryable[(Boolean, Any)] + with Compiler[MapFunction[Row, Any]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, Any] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: CRow): (Boolean, Any) = + (in.change, function.map(in.row)) + + override def getProducedType: TypeInformation[(Boolean, Any)] = returnType } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala index 966dea9..cb8f695 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory class CRowOutputMapRunner( name: String, code: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends RichMapFunction[Any, CRow] with ResultTypeQueryable[CRow] with Compiler[MapFunction[Any, Row]] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala index a0415e1..478b6b6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala @@ -32,7 +32,7 @@ class CorrelateFlatMapRunner[IN, OUT]( flatMapCode: String, collectorName: String, collectorCode: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichFlatMapFunction[IN, OUT] with ResultTypeQueryable[OUT] with Compiler[Any] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala index 715848d..67acc0b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory class FlatJoinRunner[IN1, IN2, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichFlatJoinFunction[IN1, IN2, OUT] with ResultTypeQueryable[OUT] with Compiler[FlatJoinFunction[IN1, IN2, OUT]] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala index 2e37baf..938da59 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory class FlatMapRunner( name: String, code: String, - @transient returnType: TypeInformation[Row]) + @transient var returnType: TypeInformation[Row]) extends RichFlatMapFunction[Row, Row] with ResultTypeQueryable[Row] with Compiler[FlatMapFunction[Row, Row]] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala index 644e855..5f3dbb4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala @@ -24,7 +24,7 @@ import org.apache.flink.util.Collector class MapJoinLeftRunner[IN1, IN2, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT], + returnType: TypeInformation[OUT], broadcastSetName: String) extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala index eee38d1..e2d9331 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala @@ -24,7 +24,7 @@ import org.apache.flink.util.Collector class MapJoinRightRunner[IN1, IN2, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT], + returnType: TypeInformation[OUT], broadcastSetName: String) extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala index 51e2fc5..f81368a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory class MapRunner[IN, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] with Compiler[MapFunction[IN, OUT]] { http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala index 090e184..00b7b8e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT], + @transient var returnType: TypeInformation[OUT], broadcastSetName: String) extends RichFlatMapFunction[MULTI_IN, OUT] with ResultTypeQueryable[OUT] http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala index 22a2682..9bcac30 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala @@ -41,7 +41,7 @@ class DataSetSessionWindowAggregatePreProcessor( genAggregations: GeneratedAggregationsFunction, keysAndAggregatesArity: Int, gap: Long, - @transient intermediateRowType: TypeInformation[Row]) + @transient var intermediateRowType: TypeInformation[Row]) extends AbstractRichFunction with MapPartitionFunction[Row,Row] with GroupCombineFunction[Row,Row] http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index 745f24d..6ee37e6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -17,6 +17,8 @@ */ package org.apache.flink.table.runtime.aggregate +import java.lang.{Long => JLong} + import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.types.Row @@ -24,6 +26,7 @@ import org.apache.flink.util.Collector import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.api.Types import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.slf4j.LoggerFactory import org.apache.flink.table.runtime.types.CRow @@ -47,7 +50,10 @@ class GroupAggProcessFunction( private var newRow: CRow = _ private var prevRow: CRow = _ private var firstRow: Boolean = _ + // stores the accumulators private var state: ValueState[Row] = _ + // counts the number of added and retracted input records + private var cntState: ValueState[JLong] = _ override def open(config: Configuration) { LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + @@ -65,6 +71,9 @@ class GroupAggProcessFunction( val stateDescriptor: ValueStateDescriptor[Row] = new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType) state = getRuntimeContext.getState(stateDescriptor) + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG) + cntState = getRuntimeContext.getState(inputCntDescriptor) } override def processElement( @@ -74,11 +83,14 @@ class GroupAggProcessFunction( val input = inputC.row - // get accumulators + // get accumulators and input counter var accumulators = state.value() + var inputCnt = cntState.value() + if (null == accumulators) { firstRow = true accumulators = function.createAccumulators() + inputCnt = 0L } else { firstRow = false } @@ -92,29 +104,44 @@ class GroupAggProcessFunction( // update aggregate result and set to the newRow if (inputC.change) { + inputCnt += 1 // accumulate input function.accumulate(accumulators, input) function.setAggregationResults(accumulators, newRow.row) } else { + inputCnt -= 1 // retract input function.retract(accumulators, input) function.setAggregationResults(accumulators, newRow.row) } - // update accumulators - state.update(accumulators) - - // if previousRow is not null, do retraction process - if (generateRetraction && !firstRow) { - if (prevRow.row.equals(newRow.row)) { - // ignore same newRow - return - } else { - // retract previous row - out.collect(prevRow) + if (inputCnt != 0) { + // we aggregated at least one record for this key + + // update the state + state.update(accumulators) + cntState.update(inputCnt) + + // if this was not the first row and we have to emit retractions + if (generateRetraction && !firstRow) { + if (prevRow.row.equals(newRow.row)) { + // newRow is the same as before. Do not emit retraction and acc messages + return + } else { + // retract previous result + out.collect(prevRow) + } } - } + // emit the new result + out.collect(newRow) - out.collect(newRow) + } else { + // we retracted the last record for this key + // sent out a delete message + out.collect(prevRow) + // and clear all state + state.clear() + cntState.clear() + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala index ec73fa6..1cb3a6e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory class CRowValuesInputFormat( name: String, code: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends GenericInputFormat[CRow] with NonParallelInput with ResultTypeQueryable[CRow] http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala index d536b39..43ce605 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory class ValuesInputFormat( name: String, code: String, - @transient returnType: TypeInformation[Row]) + @transient var returnType: TypeInformation[Row]) extends GenericInputFormat[Row] with NonParallelInput with ResultTypeQueryable[Row] http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala new file mode 100644 index 0000000..abdca17 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.Table + +/** + * Defines an external [[TableSink]] to emit streaming [[Table]] with only insert changes. + * + * If the [[Table]] is also modified by update or delete changes, a + * [[org.apache.flink.table.api.TableException]] will be thrown. + * + * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. + */ +trait AppendStreamTableSink[T] extends TableSink[T] { + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[T]): Unit +} http://git-wip-us.apache.org/repos/asf/flink/blob/e265620d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala index 809afd2..7214394 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala @@ -25,7 +25,6 @@ import org.apache.flink.types.Row import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} /** * A simple [[TableSink]] to emit data as CSV files. @@ -40,7 +39,7 @@ class CsvTableSink( fieldDelim: Option[String], numFiles: Option[Int], writeMode: Option[WriteMode]) - extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] { + extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] { /** * A simple [[TableSink]] to emit data as CSV files. @@ -134,100 +133,3 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { builder.mkString } } - -/** - * A simple [[TableSink]] to emit data as CSV files. - * - * @param path The output path to write the Table to. - * @param fieldDelim The field delimiter - * @param numFiles The number of files to write to - * @param writeMode The write mode to specify whether existing files are overwritten or not. - */ -class CsvRetractTableSink( - path: String, - fieldDelim: Option[String], - numFiles: Option[Int], - writeMode: Option[WriteMode]) - extends TableSinkBase[Row] with StreamRetractSink[Row] { - - override def needsUpdatesAsRetraction: Boolean = true - - /** - * A simple [[TableSink]] to emit data as CSV files. - * - * @param path The output path to write the Table to. - * @param fieldDelim The field delimiter, ',' by default. - */ - def this(path: String, fieldDelim: String = ",") { - this(path, Some(fieldDelim), None, None) - } - - /** - * A simple [[TableSink]] to emit data as CSV files. - * - * @param path The output path to write the Table to. - * @param fieldDelim The field delimiter. - * @param numFiles The number of files to write to. - * @param writeMode The write mode to specify whether existing files are overwritten or not. - */ - def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) { - this(path, Some(fieldDelim), Some(numFiles), Some(writeMode)) - } - - - override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = { - val csvRows = dataStream - .map(new CsvRetractFormatter(fieldDelim.getOrElse(","))) - .returns(TypeInformation.of(classOf[String])) - - - if (numFiles.isDefined) { - csvRows.setParallelism(numFiles.get) - } - - val sink = writeMode match { - case None => csvRows.writeAsText(path) - case Some(wm) => csvRows.writeAsText(path, wm) - } - - if (numFiles.isDefined) { - sink.setParallelism(numFiles.get) - } - } - - override protected def copy: TableSinkBase[Row] = { - new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode) - } - - override def getOutputType: TypeInformation[Row] = { - new RowTypeInfo(getFieldTypes: _*) - } -} - -/** - * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the - * field delimiter. - * - * @param fieldDelim The field delimiter. - */ -class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] { - override def map(rowT: JTuple2[Boolean,Row]): String = { - - val row: Row = rowT.f1 - - val builder = new StringBuilder - - builder.append(rowT.f0.toString) - - // write following values - for (i <- 0 until row.getArity) { - builder.append(fieldDelim) - val v = row.getField(i) - if (v != null) { - builder.append(v.toString) - } - } - builder.mkString - } -} -