flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/3] flink git commit: [FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.
Date Wed, 26 Apr 2017 11:50:11 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
new file mode 100644
index 0000000..b2b062e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * The collector is used to wrap a [[Row]] to a [[CRow]]
+  */
+class CRowWrappingCollector() extends Collector[Row] {
+
+  var out: Collector[CRow] = _
+  val outCRow: CRow = new CRow()
+
+  def setChange(change: Boolean): Unit = this.outCRow.change = change
+
+  override def collect(record: Row): Unit = {
+    outCRow.row = record
+    out.collect(outCRow)
+  }
+
+  override def close(): Unit = out.close()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
index b446306..2e37baf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -24,20 +24,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.slf4j.LoggerFactory
 
-class FlatMapRunner[IN, OUT](
+class FlatMapRunner(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends RichFlatMapFunction[IN, OUT]
-  with ResultTypeQueryable[OUT]
-  with Compiler[FlatMapFunction[IN, OUT]] {
+    @transient returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row]
+  with Compiler[FlatMapFunction[Row, Row]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var function: FlatMapFunction[IN, OUT] = _
+  private var function: FlatMapFunction[Row, Row] = _
 
   override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
@@ -48,10 +49,10 @@ class FlatMapRunner[IN, OUT](
     FunctionUtils.openFunction(function, parameters)
   }
 
-  override def flatMap(in: IN, out: Collector[OUT]): Unit =
+  override def flatMap(in: Row, out: Collector[Row]): Unit =
     function.flatMap(in, out)
 
-  override def getProducedType: TypeInformation[OUT] = returnType
+  override def getProducedType: TypeInformation[Row] = returnType
 
   override def close(): Unit = {
     FunctionUtils.closeFunction(function)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index c608b97..88cf2f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.aggregate
 import java.util.{ArrayList => JArrayList, List => JList}
 import org.apache.flink.api.common.functions.{AggregateFunction => DataStreamAggFunc}
 import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
 /**
@@ -29,12 +30,12 @@ import org.apache.flink.types.Row
   *
   * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
   *                         used for this aggregation
-  * @param aggFields   the position (in the input Row) of the input value for each aggregate
+  * @param aggFields   the position (in the input [[CRow]]) of the input value for each aggregate
   */
 class AggregateAggFunction(
     private val aggregates: Array[AggregateFunction[_]],
     private val aggFields: Array[Array[Int]])
-  extends DataStreamAggFunc[Row, Row, Row] {
+  extends DataStreamAggFunc[CRow, Row, Row] {
 
   override def createAccumulator(): Row = {
     val accumulatorRow: Row = new Row(aggregates.length)
@@ -46,12 +47,14 @@ class AggregateAggFunction(
     accumulatorRow
   }
 
-  override def add(value: Row, accumulatorRow: Row): Unit = {
+  override def add(value: CRow, accumulatorRow: Row): Unit = {
+
+    val row = value.row
 
     var i = 0
     while (i < aggregates.length) {
       val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
-      val v = value.getField(aggFields(i)(0))
+      val v = row.getField(aggFields(i)(0))
       aggregates(i).accumulate(acc, v)
       i += 1
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 634f7c8..f7601be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -41,6 +41,7 @@ import org.apache.flink.table.functions.aggfunctions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
 import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
@@ -65,13 +66,13 @@ object AggregateUtil {
     * @param isPartitioned It is a tag that indicate whether the input is partitioned
     * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
     */
-  private[flink] def createUnboundedOverProcessFunction(
+  private[flink] def createUnboundedOverProcessFunction[T](
     generator: CodeGenerator,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     inputType: RelDataType,
     isRowTimeType: Boolean,
     isPartitioned: Boolean,
-    isRowsClause: Boolean): ProcessFunction[Row, Row] = {
+    isRowsClause: Boolean): ProcessFunction[CRow, CRow] = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
@@ -102,13 +103,13 @@ object AggregateUtil {
         new RowTimeUnboundedRowsOver(
           genFunction,
           aggregationStateType,
-          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+          CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(inputType)))
       } else {
         // RANGE unbounded over process function
         new RowTimeUnboundedRangeOver(
           genFunction,
           aggregationStateType,
-          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+          CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(inputType)))
       }
     } else {
       if (isPartitioned) {
@@ -135,13 +136,15 @@ object AggregateUtil {
   private[flink] def createGroupAggregateFunction(
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
-      groupings: Array[Int]): ProcessFunction[Row, Row] = {
+      groupings: Array[Int],
+      genereateRetraction: Boolean,
+      consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
         namedAggregates.map(_.getKey),
         inputType,
-        needRetraction = false)
+        consumeRetraction)
 
     val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
 
@@ -149,7 +152,8 @@ object AggregateUtil {
       aggregates,
       aggFields,
       groupings,
-      aggregationStateType)
+      aggregationStateType,
+      genereateRetraction)
   }
 
   /**
@@ -160,17 +164,17 @@ object AggregateUtil {
     * @param namedAggregates List of calls to aggregate functions and their output field names
     * @param inputType       Input row type
     * @param precedingOffset the preceding offset
-    * @param isRowsClause   It is a tag that indicates whether the OVER clause is ROWS clause
+    * @param isRowsClause    It is a tag that indicates whether the OVER clause is ROWS clause
     * @param isRowTimeType   It is a tag that indicates whether the time type is rowTimeType
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
-  private[flink] def createBoundedOverProcessFunction(
+  private[flink] def createBoundedOverProcessFunction[T](
     generator: CodeGenerator,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     inputType: RelDataType,
     precedingOffset: Long,
     isRowsClause: Boolean,
-    isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+    isRowTimeType: Boolean): ProcessFunction[CRow, CRow] = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
@@ -179,7 +183,7 @@ object AggregateUtil {
         needRetraction = true)
 
     val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
-    val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+    val inputRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(inputType))
 
     val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
     val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -730,7 +734,7 @@ object AggregateUtil {
       window: LogicalWindow,
       finalRowArity: Int,
       properties: Seq[NamedWindowProperty])
-    : AllWindowFunction[Row, Row, DataStreamWindow] = {
+    : AllWindowFunction[Row, CRow, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
       val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
@@ -738,7 +742,7 @@ object AggregateUtil {
         startPos,
         endPos,
         finalRowArity)
-        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+        .asInstanceOf[AllWindowFunction[Row, CRow, DataStreamWindow]]
     } else {
       new IncrementalAggregateAllWindowFunction(
         finalRowArity)
@@ -754,7 +758,7 @@ object AggregateUtil {
       numAggregates: Int,
       finalRowArity: Int,
       properties: Seq[NamedWindowProperty])
-    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+    : WindowFunction[Row, CRow, Tuple, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
       val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
@@ -764,7 +768,7 @@ object AggregateUtil {
         startPos,
         endPos,
         finalRowArity)
-        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+        .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]]
     } else {
       new IncrementalAggregateWindowFunction(
         numGroupingKeys,
@@ -778,7 +782,7 @@ object AggregateUtil {
       inputType: RelDataType,
       outputType: RelDataType,
       groupKeysIndex: Array[Int])
-    : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
+    : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index d108570..8a4630c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -66,7 +66,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
 
   private var aggregateBuffer: Row = _
   private var output: Row = _
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: RowTimeWindowPropertyCollector = _
   private val accumStartPos: Int = groupKeysMapping.length
   private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2
   private val intermediateRowWindowStartPos = intermediateRowArity - 2
@@ -79,7 +79,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
   override def open(config: Configuration) {
     aggregateBuffer = new Row(intermediateRowArity)
     output = new Row(finalRowArity)
-    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+    collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
 
     // init lists with two empty accumulators
     for (i <- aggregates.indices) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index d6bc006..c6d74c1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -55,7 +55,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(groupKeysMapping)
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: RowTimeWindowPropertyCollector = _
   private var output: Row = _
   private val accumulatorStartPos: Int = groupKeysMapping.length
   protected val windowStartPos: Int = accumulatorStartPos + aggregates.length
@@ -66,7 +66,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
 
   override def open(config: Configuration) {
     output = new Row(finalRowArity)
-    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+    collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
 
     // init lists with two empty accumulators
     var i = 0

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 99e2a0a..37e5730 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -54,7 +54,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(groupKeysMapping)
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: RowTimeWindowPropertyCollector = _
   protected var aggregateBuffer: Row = _
   private var output: Row = _
   private val accumStartPos: Int = groupKeysMapping.length
@@ -69,7 +69,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
   override def open(config: Configuration) {
     aggregateBuffer = new Row(intermediateRowArity)
     output = new Row(finalRowArity)
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
 
     // init lists with two empty accumulators
     for (i <- aggregates.indices) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 3813aa0..c0652ff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
 import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.runtime.types.CRow
 
 /**
   * Aggregate Function used for the groupby (without window) aggregate
@@ -41,60 +42,95 @@ class GroupAggProcessFunction(
     private val aggregates: Array[AggregateFunction[_]],
     private val aggFields: Array[Array[Int]],
     private val groupings: Array[Int],
-    private val aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row] {
+    private val aggregationStateType: RowTypeInfo,
+    private val generateRetraction: Boolean)
+  extends ProcessFunction[CRow, CRow] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
-  private var output: Row = _
+  private var newRow: CRow = _
+  private var prevRow: CRow = _
+  private var firstRow: Boolean = _
   private var state: ValueState[Row] = _
 
   override def open(config: Configuration) {
-    output = new Row(groupings.length + aggregates.length)
+    newRow = new CRow(new Row(groupings.length + aggregates.length), true)
+    prevRow = new CRow(new Row(groupings.length + aggregates.length), false)
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
     state = getRuntimeContext.getState(stateDescriptor)
   }
 
   override def processElement(
-      input: Row,
-      ctx: ProcessFunction[Row, Row]#Context,
-      out: Collector[Row]): Unit = {
+      input: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
 
     var i = 0
 
     var accumulators = state.value()
 
     if (null == accumulators) {
+      firstRow = true
       accumulators = new Row(aggregates.length)
       i = 0
       while (i < aggregates.length) {
         accumulators.setField(i, aggregates(i).createAccumulator())
         i += 1
       }
+    } else {
+      firstRow = false
     }
 
-    // Set group keys value to the final output
+    // Set group keys value to the newRow and prevRow
     i = 0
     while (i < groupings.length) {
-      output.setField(i, input.getField(groupings(i)))
+      newRow.row.setField(i, input.row.getField(groupings(i)))
+      prevRow.row.setField(i, input.row.getField(groupings(i)))
       i += 1
     }
 
-    // Set aggregate result to the final output
-    i = 0
-    while (i < aggregates.length) {
-      val index = groupings.length + i
-      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
-      output.setField(index, aggregates(i).getValue(accumulator))
-      i += 1
+    // Set previous aggregate result to the prevRow
+    // Set current aggregate result to the newRow
+    if (input.change) {
+      // accumulate input
+      i = 0
+      while (i < aggregates.length) {
+        val index = groupings.length + i
+        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+        prevRow.row.setField(index, aggregates(i).getValue(accumulator))
+        aggregates(i).accumulate(accumulator, input.row.getField(aggFields(i)(0)))
+        newRow.row.setField(index, aggregates(i).getValue(accumulator))
+        i += 1
+      }
+    } else {
+      // retract input
+      i = 0
+      while (i < aggregates.length) {
+        val index = groupings.length + i
+        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+        prevRow.row.setField(index, aggregates(i).getValue(accumulator))
+        aggregates(i).retract(accumulator, input.row.getField(aggFields(i)(0)))
+        newRow.row.setField(index, aggregates(i).getValue(accumulator))
+        i += 1
+      }
     }
+
     state.update(accumulators)
 
-    out.collect(output)
-  }
+    // if previousRow is not null, do retraction process
+    if (generateRetraction && !firstRow) {
+      if (prevRow.row.equals(newRow.row)) {
+        // ignore same newRow
+        return
+      } else {
+        // retract previous row
+        out.collect(prevRow)
+      }
+    }
 
+    out.collect(newRow)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index ec9b654..711cc05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -22,6 +22,7 @@ import java.lang.Iterable
 import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -39,17 +40,17 @@ class IncrementalAggregateAllTimeWindowFunction(
   extends IncrementalAggregateAllWindowFunction[TimeWindow](
     finalRowArity) {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: CRowTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
     super.open(parameters)
   }
 
   override def apply(
       window: TimeWindow,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     // set collector and window
     collector.wrappedCollector = out

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index f92be92..c190785 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -23,6 +23,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -32,12 +33,12 @@ import org.apache.flink.util.Collector
   */
 class IncrementalAggregateAllWindowFunction[W <: Window](
     private val finalRowArity: Int)
-  extends RichAllWindowFunction[Row, Row, W] {
+  extends RichAllWindowFunction[Row, CRow, W] {
 
-  private var output: Row = _
+  private var output: CRow = _
 
   override def open(parameters: Configuration): Unit = {
-    output = new Row(finalRowArity)
+    output = new CRow(new Row(finalRowArity), true)
   }
 
   /**
@@ -47,7 +48,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
   override def apply(
       window: W,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     val iterator = records.iterator
 
@@ -55,7 +56,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
       val record = iterator.next()
       var i = 0
       while (i < record.getArity) {
-        output.setField(i, record.getField(i))
+        output.row.setField(i, record.getField(i))
         i += 1
       }
       out.collect(output)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index dccb4f6..809bbfd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -43,10 +44,10 @@ class IncrementalAggregateTimeWindowFunction(
     numAggregates,
     finalRowArity) {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: CRowTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
     super.open(parameters)
   }
 
@@ -54,7 +55,7 @@ class IncrementalAggregateTimeWindowFunction(
       key: Tuple,
       window: TimeWindow,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     // set collector and window
     collector.wrappedCollector = out

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 983efb3..7e9d738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -24,6 +24,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -37,12 +38,12 @@ class IncrementalAggregateWindowFunction[W <: Window](
     private val numGroupingKey: Int,
     private val numAggregates: Int,
     private val finalRowArity: Int)
-  extends RichWindowFunction[Row, Row, Tuple, W] {
+  extends RichWindowFunction[Row, CRow, Tuple, W] {
 
-  private var output: Row = _
+  private var output: CRow = _
 
   override def open(parameters: Configuration): Unit = {
-    output = new Row(finalRowArity)
+    output = new CRow(new Row(finalRowArity), true)
   }
 
   /**
@@ -53,7 +54,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
       key: Tuple,
       window: W,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     val iterator = records.iterator
 
@@ -62,12 +63,12 @@ class IncrementalAggregateWindowFunction[W <: Window](
 
       var i = 0
       while (i < numGroupingKey) {
-        output.setField(i, key.getField(i))
+        output.row.setField(i, key.getField(i))
         i += 1
       }
       i = 0
       while (i < numAggregates) {
-        output.setField(numGroupingKey + i, record.getField(i))
+        output.row.setField(numGroupingKey + i, record.getField(i))
         i += 1
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 7f87e50..c7adc09 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -31,7 +31,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{ArrayList, List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 /**
@@ -47,10 +48,10 @@ class ProcTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     precedingTimeBoundary: Long,
     aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+    inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
-  private var output: Row = _
+  private var output: CRow = _
   private var accumulatorState: ValueState[Row] = _
   private var rowMapState: MapState[Long, JList[Row]] = _
 
@@ -66,11 +67,12 @@ class ProcTimeBoundedRangeOver(
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
 
     // We keep the elements received in a MapState indexed based on their ingestion time
     val rowListTypeInfo: TypeInformation[JList[Row]] =
-      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+      new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
+        .asInstanceOf[TypeInformation[JList[Row]]]
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
@@ -82,9 +84,9 @@ class ProcTimeBoundedRangeOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    input: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
 
     val currentTime = ctx.timerService.currentProcessingTime
     // buffer the event incoming event
@@ -97,15 +99,15 @@ class ProcTimeBoundedRangeOver(
       // register timer to process event once the current millisecond passed
       ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
     }
-    rowList.add(input)
+    rowList.add(input.row)
     rowMapState.put(currentTime, rowList)
 
   }
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
 
     // we consider the original timestamp of events that have registered this time trigger 1 ms ago
     val currentTime = timestamp - 1
@@ -166,10 +168,10 @@ class ProcTimeBoundedRangeOver(
       val input = currentElements.get(iElemenets)
 
       // set the fields of the last event to carry on with the aggregates
-      function.setForwardedFields(input, output)
+      function.setForwardedFields(input, output.row)
 
       // add the accumulators values to result
-      function.setAggregationResults(accumulators, output)
+      function.setAggregationResults(accumulators, output.row)
       out.collect(output)
       iElemenets += 1
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index 31cfd73..0c7f44e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 /**
@@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     precedingOffset: Long,
     aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+    inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
   Preconditions.checkArgument(precedingOffset > 0)
 
   private var accumulatorState: ValueState[Row] = _
   private var rowMapState: MapState[Long, JList[Row]] = _
-  private var output: Row = _
+  private var output: CRow = _
   private var counterState: ValueState[Long] = _
   private var smallestTsState: ValueState[Long] = _
 
@@ -73,13 +74,14 @@ class ProcTimeBoundedRowsOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     // We keep the elements received in a Map state keyed
     // by the ingestion time in the operator.
     // we also keep counter of processed elements
     // and timestamp of oldest element
     val rowListTypeInfo: TypeInformation[JList[Row]] =
-      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+      new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
+        .asInstanceOf[TypeInformation[JList[Row]]]
 
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
@@ -100,9 +102,11 @@ class ProcTimeBoundedRowsOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     val currentTime = ctx.timerService.currentProcessingTime
 
@@ -154,11 +158,11 @@ class ProcTimeBoundedRowsOver(
     }
 
     // copy forwarded fields in output row
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, output.row)
 
     // accumulate current row and set aggregate in output row
     function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
+    function.setAggregationResults(accumulators, output.row)
 
     // update map state, accumulator state, counter and timestamp
     val currentTimeState = rowMapState.get(currentTime)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
index 6b9800b..35618dc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
 import org.slf4j.LoggerFactory
 
 /**
@@ -37,12 +38,12 @@ import org.slf4j.LoggerFactory
 class ProcTimeUnboundedNonPartitionedOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with CheckpointedFunction
     with Compiler[GeneratedAggregations] {
 
   private var accumulators: Row = _
-  private var output: Row = _
+  private var output: CRow = _
   private var state: ListState[Row] = _
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -58,7 +59,7 @@ class ProcTimeUnboundedNonPartitionedOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     if (null == accumulators) {
       val it = state.get().iterator()
       if (it.hasNext) {
@@ -70,14 +71,16 @@ class ProcTimeUnboundedNonPartitionedOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+      inputC: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, output.row)
 
     function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
+    function.setAggregationResults(accumulators, output.row)
 
     out.collect(output)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
index 9baa6a3..847c1bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
@@ -24,7 +24,8 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
 import org.slf4j.LoggerFactory
 
 /**
@@ -36,10 +37,10 @@ import org.slf4j.LoggerFactory
 class ProcTimeUnboundedPartitionedOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
-  private var output: Row = _
+  private var output: CRow = _
   private var state: ValueState[Row] = _
   val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
@@ -54,16 +55,18 @@ class ProcTimeUnboundedPartitionedOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("overState", aggregationStateType)
     state = getRuntimeContext.getState(stateDescriptor)
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     var accumulators = state.value()
 
@@ -71,13 +74,12 @@ class ProcTimeUnboundedPartitionedOver(
       accumulators = function.createAccumulators()
     }
 
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, output.row)
 
     function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
+    function.setAggregationResults(accumulators, output.row)
 
     state.update(accumulators)
-
     out.collect(output)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 03ca02c..4020d44 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -17,14 +17,15 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.util.{List => JList, ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, List => JList}
 
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
 import org.slf4j.LoggerFactory
@@ -40,14 +41,14 @@ import org.slf4j.LoggerFactory
 class RowTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
-    inputRowType: RowTypeInfo,
+    inputRowType: CRowTypeInfo,
     precedingOffset: Long)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
 
-  private var output: Row = _
+  private var output: CRow = _
 
   // the state which keeps the last triggering timestamp
   private var lastTriggeringTsState: ValueState[Long] = _
@@ -74,7 +75,7 @@ class RowTimeBoundedRangeOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
 
     val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
       new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
@@ -86,7 +87,8 @@ class RowTimeBoundedRangeOver(
 
     val keyTypeInformation: TypeInformation[Long] =
       BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
-    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+    val valueTypeInformation: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
 
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]](
@@ -98,9 +100,11 @@ class RowTimeBoundedRangeOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     // triggering timestamp for trigger calculation
     val triggeringTs = ctx.timestamp
@@ -125,8 +129,8 @@ class RowTimeBoundedRangeOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
 
@@ -172,13 +176,13 @@ class RowTimeBoundedRangeOver(
       }
 
       // set aggregate in output row
-      function.setAggregationResults(accumulators, output)
+      function.setAggregationResults(accumulators, output.row)
 
       // copy forwarded fields to output row and emit output row
       dataListIndex = 0
       while (dataListIndex < inputs.size()) {
         aggregatesIndex = 0
-        function.setForwardedFields(inputs.get(dataListIndex), output)
+        function.setForwardedFields(inputs.get(dataListIndex), output.row)
         out.collect(output)
         dataListIndex += 1
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 4a9a14c..5ec6ec7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -27,7 +27,8 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 /**
@@ -41,15 +42,15 @@ import org.slf4j.LoggerFactory
 class RowTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
-    inputRowType: RowTypeInfo,
+    inputRowType: CRowTypeInfo,
     precedingOffset: Long)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
 
-  private var output: Row = _
+  private var output: CRow = _
 
   // the state which keeps the last triggering timestamp
   private var lastTriggeringTsState: ValueState[Long] = _
@@ -79,7 +80,7 @@ class RowTimeBoundedRowsOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
 
     val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
       new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
@@ -95,7 +96,8 @@ class RowTimeBoundedRowsOver(
 
     val keyTypeInformation: TypeInformation[Long] =
       BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
-    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+    val valueTypeInformation: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
 
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]](
@@ -107,9 +109,11 @@ class RowTimeBoundedRowsOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     // triggering timestamp for trigger calculation
     val triggeringTs = ctx.timestamp
@@ -134,8 +138,8 @@ class RowTimeBoundedRowsOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
 
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
@@ -189,7 +193,7 @@ class RowTimeBoundedRowsOver(
         }
 
         // copy forwarded fields to output row
-        function.setForwardedFields(input, output)
+        function.setForwardedFields(input, output.row)
 
         // retract old row from accumulators
         if (null != retractRow) {
@@ -198,7 +202,7 @@ class RowTimeBoundedRowsOver(
 
         // accumulate current row and set aggregate in output row
         function.accumulate(accumulators, input)
-        function.setAggregationResults(accumulators, output)
+        function.setAggregationResults(accumulators, output.row)
         i += 1
 
         out.collect(output)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 525d4d7..3e2a811 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -28,7 +28,8 @@ import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.streaming.api.operators.TimestampedCollector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 
@@ -42,11 +43,11 @@ import org.slf4j.LoggerFactory
 abstract class RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+    inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
-  protected var output: Row = _
+  protected var output: CRow = _
   // state to hold the accumulators of the aggregations
   private var accumulatorState: ValueState[Row] = _
   // state to hold rows until the next watermark arrives
@@ -67,7 +68,7 @@ abstract class RowTimeUnboundedOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     sortedTimestamps = new util.LinkedList[Long]()
 
     // initialize accumulator state
@@ -76,7 +77,8 @@ abstract class RowTimeUnboundedOver(
     accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
 
     // initialize row state
-    val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType)
+    val rowListTypeInfo: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
@@ -87,15 +89,17 @@ abstract class RowTimeUnboundedOver(
     * Puts an element from the input stream into state if it is not late.
     * Registers a timer for the next watermark.
     *
-    * @param input The input value.
+    * @param inputC The input value.
     * @param ctx   The ctx to register timer or get current time
     * @param out   The collector for returning result values.
     *
     */
   override def processElement(
-     input: Row,
-     ctx:  ProcessFunction[Row, Row]#Context,
-     out: Collector[Row]): Unit = {
+     inputC: CRow,
+     ctx:  ProcessFunction[CRow, CRow]#Context,
+     out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     val timestamp = ctx.timestamp()
     val curWatermark = ctx.timerService().currentWatermark()
@@ -126,11 +130,11 @@ abstract class RowTimeUnboundedOver(
     */
   override def onTimer(
       timestamp: Long,
-      ctx: ProcessFunction[Row, Row]#OnTimerContext,
-      out: Collector[Row]): Unit = {
+      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
 
-    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
-    val collector = out.asInstanceOf[TimestampedCollector[Row]]
+    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]])
+    val collector = out.asInstanceOf[TimestampedCollector[CRow]]
 
     val keyIterator = rowMapState.keys.iterator
     if (keyIterator.hasNext) {
@@ -206,7 +210,7 @@ abstract class RowTimeUnboundedOver(
   def processElementsWithSameTimestamp(
     curRowList: JList[Row],
     lastAccumulator: Row,
-    out: Collector[Row]): Unit
+    out: Collector[CRow]): Unit
 
 }
 
@@ -217,7 +221,7 @@ abstract class RowTimeUnboundedOver(
 class RowTimeUnboundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
+    inputType: TypeInformation[CRow])
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
@@ -226,7 +230,7 @@ class RowTimeUnboundedRowsOver(
   override def processElementsWithSameTimestamp(
     curRowList: JList[Row],
     lastAccumulator: Row,
-    out: Collector[Row]): Unit = {
+    out: Collector[CRow]): Unit = {
 
     var i = 0
     while (i < curRowList.size) {
@@ -234,11 +238,11 @@ class RowTimeUnboundedRowsOver(
 
       var j = 0
       // copy forwarded fields to output row
-      function.setForwardedFields(curRow, output)
+      function.setForwardedFields(curRow, output.row)
 
       // update accumulators and copy aggregates to output row
       function.accumulate(lastAccumulator, curRow)
-      function.setAggregationResults(lastAccumulator, output)
+      function.setAggregationResults(lastAccumulator, output.row)
       // emit output row
       out.collect(output)
       i += 1
@@ -255,7 +259,7 @@ class RowTimeUnboundedRowsOver(
 class RowTimeUnboundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
+    inputType: TypeInformation[CRow])
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
@@ -264,7 +268,7 @@ class RowTimeUnboundedRangeOver(
   override def processElementsWithSameTimestamp(
     curRowList: JList[Row],
     lastAccumulator: Row,
-    out: Collector[Row]): Unit = {
+    out: Collector[CRow]): Unit = {
 
     var i = 0
     // all same timestamp data should have same aggregation value.
@@ -281,10 +285,10 @@ class RowTimeUnboundedRangeOver(
       val curRow = curRowList.get(i)
 
       // copy forwarded fields to output row
-      function.setForwardedFields(curRow, output)
+      function.setForwardedFields(curRow, output.row)
 
       //copy aggregates to output row
-      function.setAggregationResults(lastAccumulator, output)
+      function.setAggregationResults(lastAccumulator, output.row)
       out.collect(output)
       i += 1
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 9502607..0c8ae00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.aggregate
 
 import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
@@ -26,29 +27,48 @@ import org.apache.flink.util.Collector
   * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
   * collector.
   */
-class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
-    extends Collector[Row] {
+abstract class TimeWindowPropertyCollector[T](
+    windowStartOffset: Option[Int],
+    windowEndOffset: Option[Int])
+  extends Collector[T] {
 
-  var wrappedCollector: Collector[Row] = _
+  var wrappedCollector: Collector[T] = _
+  var output: Row = _
   var windowStart:Long = _
   var windowEnd:Long = _
 
-  override def collect(record: Row): Unit = {
+  def getRow(record: T): Row
 
-    val lastFieldPos = record.getArity - 1
+  override def collect(record: T): Unit = {
+
+    output = getRow(record)
+    val lastFieldPos = output.getArity - 1
 
     if (windowStartOffset.isDefined) {
-      record.setField(
+      output.setField(
         lastFieldPos + windowStartOffset.get,
         SqlFunctions.internalToTimestamp(windowStart))
     }
     if (windowEndOffset.isDefined) {
-      record.setField(
+      output.setField(
         lastFieldPos + windowEndOffset.get,
         SqlFunctions.internalToTimestamp(windowEnd))
     }
+
     wrappedCollector.collect(record)
   }
 
   override def close(): Unit = wrappedCollector.close()
 }
+
+class RowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int])
+  extends TimeWindowPropertyCollector[Row](startOffset, endOffset) {
+
+  override def getRow(record: Row): Row = record
+}
+
+class CRowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int])
+  extends TimeWindowPropertyCollector[CRow](startOffset, endOffset) {
+
+  override def getRow(record: CRow): Row = record.row
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
new file mode 100644
index 0000000..ec73fa6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io
+
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+class CRowValuesInputFormat(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends GenericInputFormat[CRow]
+  with NonParallelInput
+  with ResultTypeQueryable[CRow]
+  with Compiler[GenericInputFormat[Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var format: GenericInputFormat[Row] = _
+
+  override def open(split: GenericInputSplit): Unit = {
+    LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating GenericInputFormat.")
+    format = clazz.newInstance()
+  }
+
+  override def reachedEnd(): Boolean = format.reachedEnd()
+
+  override def nextRecord(reuse: CRow): CRow = {
+    reuse.row = format.nextRecord(reuse.row)
+    reuse.change = true
+    reuse
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
index 1a339e6..d536b39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -23,20 +23,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.types.Row
 import org.slf4j.LoggerFactory
 
-class ValuesInputFormat[OUT](
+class ValuesInputFormat(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends GenericInputFormat[OUT]
+    @transient returnType: TypeInformation[Row])
+  extends GenericInputFormat[Row]
   with NonParallelInput
-  with ResultTypeQueryable[OUT]
-  with Compiler[GenericInputFormat[OUT]] {
+  with ResultTypeQueryable[Row]
+  with Compiler[GenericInputFormat[Row]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var format: GenericInputFormat[OUT] = _
+  private var format: GenericInputFormat[Row] = _
 
   override def open(split: GenericInputSplit): Unit = {
     LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
@@ -47,7 +48,7 @@ class ValuesInputFormat[OUT](
 
   override def reachedEnd(): Boolean = format.reachedEnd()
 
-  override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
+  override def nextRecord(reuse: Row): Row = format.nextRecord(reuse)
 
-  override def getProducedType: TypeInformation[OUT] = returnType
+  override def getProducedType: TypeInformation[Row] = returnType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
new file mode 100644
index 0000000..25ec8c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.types.Row
+
+/**
+  * Wrapper for a [[Row]] to add retraction information.
+  *
+  * If [[change]] is true, the [[CRow]] is an accumulate message, if it is false it is a
+  * retraction message.
+  *
+  * @param row The wrapped [[Row]].
+  * @param change true for an accumulate message, false for a retraction message.
+  */
+class CRow(var row: Row, var change: Boolean) {
+
+  def this() {
+    this(null, true)
+  }
+
+  override def toString: String = s"${if(change) "+" else "-"}$row"
+
+  override def equals(other: scala.Any): Boolean = {
+    val otherCRow = other.asInstanceOf[CRow]
+    row.equals(otherCRow.row) && change == otherCRow.change
+  }
+}
+
+object CRow {
+
+  def apply(): CRow = {
+    new CRow()
+  }
+
+  def apply(row: Row, change: Boolean): CRow = {
+    new CRow(row, change)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
new file mode 100644
index 0000000..d848c65
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
+import org.apache.flink.types.Row
+
+class CRowComparator(val rowComp: TypeComparator[Row]) extends TypeComparator[CRow] {
+
+  override def hash(record: CRow): Int = rowComp.hash(record.row)
+
+  override def setReference(toCompare: CRow): Unit = rowComp.setReference(toCompare.row)
+
+  override def equalToReference(candidate: CRow): Boolean = rowComp.equalToReference(candidate.row)
+
+  override def compareToReference(otherComp: TypeComparator[CRow]): Int = {
+    val otherCRowComp = otherComp.asInstanceOf[CRowComparator]
+    rowComp.compareToReference(otherCRowComp.rowComp)
+  }
+
+  override def compare(first: CRow, second: CRow): Int = {
+    rowComp.compare(first.row, second.row)
+  }
+
+  override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
+    rowComp.compareSerialized(firstSource, secondSource)
+  }
+
+  override def supportsNormalizedKey(): Boolean = rowComp.supportsNormalizedKey()
+
+  override def supportsSerializationWithKeyNormalization(): Boolean =
+    rowComp.supportsSerializationWithKeyNormalization()
+
+  override def getNormalizeKeyLen: Int = rowComp.getNormalizeKeyLen
+
+  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
+    rowComp.isNormalizedKeyPrefixOnly(keyBytes)
+
+  override def putNormalizedKey(
+      record: CRow,
+      target: MemorySegment,
+      offset: Int,
+      numBytes: Int): Unit = rowComp.putNormalizedKey(record.row, target, offset, numBytes)
+
+  override def writeWithKeyNormalization(record: CRow, target: DataOutputView): Unit = {
+    rowComp.writeWithKeyNormalization(record.row, target)
+    target.writeBoolean(record.change)
+  }
+
+  override def readWithKeyDenormalization(reuse: CRow, source: DataInputView): CRow = {
+    val row = rowComp.readWithKeyDenormalization(reuse.row, source)
+    reuse.row = row
+    reuse.change = source.readBoolean()
+    reuse
+  }
+
+  override def invertNormalizedKey(): Boolean = rowComp.invertNormalizedKey()
+
+  override def duplicate(): TypeComparator[CRow] = new CRowComparator(rowComp.duplicate())
+
+  override def extractKeys(record: scala.Any, target: Array[AnyRef], index: Int): Int =
+    rowComp.extractKeys(record.asInstanceOf[CRow].row, target, index)
+
+  override def getFlatComparators: Array[TypeComparator[_]] =
+    rowComp.getFlatComparators
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
new file mode 100644
index 0000000..1f56a98
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.types.Row
+
+class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[CRow] = new CRowSerializer(rowSerializer.duplicate())
+
+  override def createInstance(): CRow = new CRow(rowSerializer.createInstance(), true)
+
+  override def copy(from: CRow): CRow = new CRow(rowSerializer.copy(from.row), from.change)
+
+  override def copy(from: CRow, reuse: CRow): CRow = {
+    rowSerializer.copy(from.row, reuse.row)
+    reuse.change = from.change
+    reuse
+  }
+
+  override def getLength: Int = -1
+
+  override def serialize(record: CRow, target: DataOutputView): Unit = {
+    rowSerializer.serialize(record.row, target)
+    target.writeBoolean(record.change)
+  }
+
+  override def deserialize(source: DataInputView): CRow = {
+    val row = rowSerializer.deserialize(source)
+    val change = source.readBoolean()
+    new CRow(row, change)
+  }
+
+  override def deserialize(reuse: CRow, source: DataInputView): CRow = {
+    rowSerializer.deserialize(reuse.row, source)
+    reuse.change = source.readBoolean()
+    reuse
+  }
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    rowSerializer.copy(source, target)
+    target.writeBoolean(source.readBoolean())
+  }
+
+  override def canEqual(obj: Any): Boolean = obj.isInstanceOf[CRowSerializer]
+
+  override def equals(obj: Any): Boolean = {
+
+    if (canEqual(obj)) {
+      val other = obj.asInstanceOf[CRowSerializer]
+      rowSerializer.equals(other.rowSerializer)
+    } else {
+      false
+    }
+  }
+
+  override def hashCode: Int = rowSerializer.hashCode() * 13
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
new file mode 100644
index 0000000..456207a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+
+class CRowTypeInfo(val rowType: RowTypeInfo) extends CompositeType[CRow](classOf[CRow]) {
+
+  override def getFieldNames: Array[String] = rowType.getFieldNames
+
+  override def getFieldIndex(fieldName: String): Int = rowType.getFieldIndex(fieldName)
+
+  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] =
+    rowType.getTypeAt(fieldExpression)
+
+  override def getTypeAt[X](pos: Int): TypeInformation[X] =
+    rowType.getTypeAt(pos)
+
+  override def getFlatFields(
+      fieldExpression: String,
+      offset: Int,
+      result: util.List[FlatFieldDescriptor]): Unit =
+    rowType.getFlatFields(fieldExpression, offset, result)
+
+  override def isBasicType: Boolean = rowType.isBasicType
+
+  override def isTupleType: Boolean = rowType.isTupleType
+
+  override def getArity: Int = rowType.getArity
+
+  override def getTotalFields: Int = rowType.getTotalFields
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[CRow] =
+    new CRowSerializer(rowType.createSerializer(config))
+
+  // not implemented because we override createComparator
+  override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[CRow] = null
+
+  override def createComparator(
+      logicalKeyFields: Array[Int],
+      orders: Array[Boolean],
+      logicalFieldOffset: Int,
+      config: ExecutionConfig): TypeComparator[CRow] = {
+
+    val rowComparator = rowType.createComparator(
+      logicalKeyFields,
+      orders,
+      logicalFieldOffset,
+      config)
+
+    new CRowComparator(rowComparator)
+  }
+
+  override def equals(obj: scala.Any): Boolean = {
+    if (this.canEqual(obj)) {
+      rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType)
+    } else {
+      false
+    }
+  }
+
+  override def canEqual(obj: scala.Any): Boolean = obj.isInstanceOf[CRowTypeInfo]
+
+}
+
+object CRowTypeInfo {
+
+  def apply(rowType: TypeInformation[Row]): CRowTypeInfo = {
+    rowType match {
+      case r: RowTypeInfo => new CRowTypeInfo(r)
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index c37ee74..4a2fcdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,6 +25,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   * A simple [[TableSink]] to emit data as CSV files.
@@ -133,3 +134,4 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
     builder.mkString
   }
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 50fafbe..ef0c2e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,10 +21,11 @@ package org.apache.flink.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.runtime.types.CRowTypeInfo
 import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
 import org.apache.flink.table.utils.TableTestUtil._
 
@@ -40,6 +41,10 @@ class TableEnvironmentTest extends TableTestBase {
     STRING_TYPE_INFO,
     DOUBLE_TYPE_INFO)
 
+  val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
+
+  val cRowType = new CRowTypeInfo(rowType)
+
   val caseClassType = implicitly[TypeInformation[CClass]]
 
   val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass])
@@ -55,6 +60,14 @@ class TableEnvironmentTest extends TableTestBase {
   }
 
   @Test
+  def testGetFieldInfoCRow(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(cRowType)
+
+    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
   def testGetFieldInfoCClass(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(caseClassType)
 
@@ -93,6 +106,20 @@ class TableEnvironmentTest extends TableTestBase {
   }
 
   @Test
+  def testGetFieldInfoCRowNames(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      cRowType,
+      Array(
+        UnresolvedFieldReference("name1"),
+        UnresolvedFieldReference("name2"),
+        UnresolvedFieldReference("name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
   def testGetFieldInfoCClassNames(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
       caseClassType,
@@ -192,6 +219,45 @@ class TableEnvironmentTest extends TableTestBase {
   }
 
   @Test
+  def testGetFieldInfoCRowAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      cRowType,
+      Array(
+        Alias(UnresolvedFieldReference("f0"), "name1"),
+        Alias(UnresolvedFieldReference("f1"), "name2"),
+        Alias(UnresolvedFieldReference("f2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCRowAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      cRowType,
+      Array(
+        Alias(UnresolvedFieldReference("f2"), "name1"),
+        Alias(UnresolvedFieldReference("f0"), "name2"),
+        Alias(UnresolvedFieldReference("f1"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoCRowAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      cRowType,
+      Array(
+        Alias(UnresolvedFieldReference("xxx"), "name1"),
+        Alias(UnresolvedFieldReference("yyy"), "name2"),
+        Alias(UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test
   def testGetFieldInfoCClassAlias1(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
       caseClassType,


Mime
View raw message