flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [28/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:37 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AvgFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AvgFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AvgFunctionTest.scala
new file mode 100644
index 0000000..0671b40
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AvgFunctionTest.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in average aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class AvgAggFunctionTestBase[T: Numeric, ACC] extends AggFunctionTestBase[T, ACC] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      minVal,
+      minVal,
+      null.asInstanceOf[T],
+      minVal,
+      minVal,
+      null.asInstanceOf[T],
+      minVal,
+      minVal,
+      minVal
+    ),
+    Seq(
+      maxVal,
+      maxVal,
+      null.asInstanceOf[T],
+      maxVal,
+      maxVal,
+      null.asInstanceOf[T],
+      maxVal,
+      maxVal,
+      maxVal
+    ),
+    Seq(
+      minVal,
+      maxVal,
+      null.asInstanceOf[T],
+      numeric.fromInt(0),
+      numeric.negate(maxVal),
+      numeric.negate(minVal),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      numeric.fromInt(1),
+      numeric.fromInt(2),
+      null.asInstanceOf[T],
+      numeric.fromInt(3),
+      numeric.fromInt(4),
+      numeric.fromInt(5),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    minVal,
+    maxVal,
+    numeric.fromInt(0),
+    numeric.fromInt(3),
+    null.asInstanceOf[T]
+  )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte, IntegralAvgAccumulator] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator = new ByteAvgAggFunction()
+}
+
+class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short, IntegralAvgAccumulator] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator = new ShortAvgAggFunction()
+}
+
+class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int, IntegralAvgAccumulator] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator = new IntAvgAggFunction()
+}
+
+class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long, BigIntegralAvgAccumulator] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator = new LongAvgAggFunction()
+}
+
+class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float, FloatingAvgAccumulator] {
+
+  override def minVal = Float.MinValue
+
+  override def maxVal = Float.MaxValue
+
+  override def aggregator = new FloatAvgAggFunction()
+}
+
+class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double, FloatingAvgAccumulator] {
+
+  override def minVal = Float.MinValue
+
+  override def maxVal = Float.MaxValue
+
+  override def aggregator = new DoubleAvgAggFunction()
+}
+
+class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal, DecimalAvgAccumulator] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("987654321000000"),
+      new BigDecimal("-0.000000000012345"),
+      null,
+      new BigDecimal("0.000000000012345"),
+      new BigDecimal("-987654321000000"),
+      null,
+      new BigDecimal("0")
+    ),
+    Seq(
+      new BigDecimal("987654321000000"),
+      new BigDecimal("-0.000000000012345"),
+      null,
+      new BigDecimal("0.000000000012345"),
+      new BigDecimal("-987654321000000"),
+      null,
+      new BigDecimal("5")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    BigDecimal.ZERO,
+    BigDecimal.ONE,
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, DecimalAvgAccumulator] =
+    new DecimalAvgAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CountAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CountAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CountAggFunctionTest.scala
new file mode 100644
index 0000000..f9dd474
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CountAggFunctionTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions.{CountAccumulator, CountAggFunction}
+
+/**
+  * Test case for built-in count aggregate function
+  */
+class CountAggFunctionTest extends AggFunctionTestBase[Long, CountAccumulator] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq("a", "b", null, "c", null, "d", "e", null, "f"),
+    Seq(null, null, null, null, null, null)
+  )
+
+  override def expectedResults: Seq[Long] = Seq(6L, 0L)
+
+  override def aggregator: AggregateFunction[Long, CountAccumulator] = new CountAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
new file mode 100644
index 0000000..8a46ec5
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxAggFunctionTest.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in max aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, MaxAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      maxVal,
+      numeric.fromInt(-99),
+      numeric.fromInt(3),
+      numeric.fromInt(56),
+      numeric.fromInt(0),
+      minVal,
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    maxVal,
+    null.asInstanceOf[T]
+  )
+}
+
+class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator: AggregateFunction[Byte, MaxAccumulator[Byte]] =
+    new ByteMaxAggFunction()
+}
+
+class ShortMaxAggFunctionTest extends MaxAggFunctionTest[Short] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator: AggregateFunction[Short, MaxAccumulator[Short]] =
+    new ShortMaxAggFunction()
+}
+
+class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Int, MaxAccumulator[Int]] =
+    new IntMaxAggFunction()
+}
+
+class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Long, MaxAccumulator[Long]] =
+    new LongMaxAggFunction()
+}
+
+class FloatMaxAggFunctionTest extends MaxAggFunctionTest[Float] {
+
+  override def minVal = Float.MinValue / 2
+
+  override def maxVal = Float.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Float, MaxAccumulator[Float]] =
+    new FloatMaxAggFunction()
+}
+
+class DoubleMaxAggFunctionTest extends MaxAggFunctionTest[Double] {
+
+  override def minVal = Double.MinValue / 2
+
+  override def maxVal = Double.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Double, MaxAccumulator[Double]] =
+    new DoubleMaxAggFunction()
+}
+
+class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean, MaxAccumulator[Boolean]] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    true,
+    null.asInstanceOf[Boolean]
+  )
+
+  override def aggregator: AggregateFunction[Boolean, MaxAccumulator[Boolean]] =
+    new BooleanMaxAggFunction()
+}
+
+class DecimalMaxAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, MaxAccumulator[BigDecimal]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("1000.000001"),
+      new BigDecimal("-1"),
+      new BigDecimal("-999.998999"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-999.999"),
+      null,
+      new BigDecimal("999.999")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("1000.000001"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, MaxAccumulator[BigDecimal]] =
+    new DecimalMaxAggFunction()
+}
+
+class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulator[String]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new String("a"),
+      new String("b"),
+      new String("c"),
+      null.asInstanceOf[String],
+      new String("d")
+    ),
+    Seq(
+      null.asInstanceOf[String],
+      null.asInstanceOf[String],
+      null.asInstanceOf[String]
+    ),
+    Seq(
+      new String("1House"),
+      new String("Household"),
+      new String("house"),
+      new String("household")
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    new String("d"),
+    null.asInstanceOf[String],
+    new String("household")
+  )
+
+  override def aggregator: AggregateFunction[String, MaxAccumulator[String]] =
+    new StringMaxAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..246d964
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunctionTest[T: Numeric]
+  extends AggFunctionTestBase[T, MaxWithRetractAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      maxVal,
+      numeric.fromInt(-99),
+      numeric.fromInt(3),
+      numeric.fromInt(56),
+      numeric.fromInt(0),
+      minVal,
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    maxVal,
+    null.asInstanceOf[T]
+  )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class ByteMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Byte] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator: AggregateFunction[Byte, MaxWithRetractAccumulator[Byte]] =
+    new ByteMaxWithRetractAggFunction()
+}
+
+class ShortMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Short] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator: AggregateFunction[Short, MaxWithRetractAccumulator[Short]] =
+    new ShortMaxWithRetractAggFunction()
+}
+
+class IntMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Int] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Int, MaxWithRetractAccumulator[Int]] =
+    new IntMaxWithRetractAggFunction()
+}
+
+class LongMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Long] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Long, MaxWithRetractAccumulator[Long]] =
+    new LongMaxWithRetractAggFunction()
+}
+
+class FloatMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Float] {
+
+  override def minVal = Float.MinValue / 2
+
+  override def maxVal = Float.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Float, MaxWithRetractAccumulator[Float]] =
+    new FloatMaxWithRetractAggFunction()
+}
+
+class DoubleMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Double] {
+
+  override def minVal = Double.MinValue / 2
+
+  override def maxVal = Double.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Double, MaxWithRetractAccumulator[Double]] =
+    new DoubleMaxWithRetractAggFunction()
+}
+
+class BooleanMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Boolean, MaxWithRetractAccumulator[Boolean]] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    true,
+    null.asInstanceOf[Boolean]
+  )
+
+  override def aggregator: AggregateFunction[Boolean, MaxWithRetractAccumulator[Boolean]] =
+    new BooleanMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class DecimalMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, MaxWithRetractAccumulator[BigDecimal]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("1000.000001"),
+      new BigDecimal("-1"),
+      new BigDecimal("-999.998999"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-999.999"),
+      null,
+      new BigDecimal("999.999")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("1000.000001"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, MaxWithRetractAccumulator[BigDecimal]] =
+    new DecimalMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class StringMaxWithRetractAggFunctionTest
+  extends AggFunctionTestBase[String, MaxWithRetractAccumulator[String]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      "abc",
+      "def",
+      "ghi",
+      null,
+      "jkl",
+      null,
+      "zzz"
+    ),
+    Seq(
+      null,
+      null
+    ),
+    Seq(
+      "x",
+      null,
+      "e"
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    "zzz",
+    null,
+    "x"
+  )
+
+  override def aggregator: AggregateFunction[String, MaxWithRetractAccumulator[String]] =
+    new StringMaxWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
new file mode 100644
index 0000000..80fcace
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinAggFunctionTest.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in min aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, MinAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      maxVal,
+      numeric.fromInt(-99),
+      numeric.fromInt(3),
+      numeric.fromInt(56),
+      numeric.fromInt(0),
+      minVal,
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    minVal,
+    null.asInstanceOf[T]
+  )
+}
+
+class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator: AggregateFunction[Byte, MinAccumulator[Byte]] =
+    new ByteMinAggFunction()
+}
+
+class ShortMinAggFunctionTest extends MinAggFunctionTest[Short] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator: AggregateFunction[Short, MinAccumulator[Short]] =
+    new ShortMinAggFunction()
+}
+
+class IntMinAggFunctionTest extends MinAggFunctionTest[Int] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Int, MinAccumulator[Int]] =
+    new IntMinAggFunction()
+}
+
+class LongMinAggFunctionTest extends MinAggFunctionTest[Long] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Long, MinAccumulator[Long]] =
+    new LongMinAggFunction()
+}
+
+class FloatMinAggFunctionTest extends MinAggFunctionTest[Float] {
+
+  override def minVal = Float.MinValue / 2
+
+  override def maxVal = Float.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Float, MinAccumulator[Float]] =
+    new FloatMinAggFunction()
+}
+
+class DoubleMinAggFunctionTest extends MinAggFunctionTest[Double] {
+
+  override def minVal = Double.MinValue / 2
+
+  override def maxVal = Double.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Double, MinAccumulator[Double]] =
+    new DoubleMinAggFunction()
+}
+
+class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean, MinAccumulator[Boolean]] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    false,
+    null.asInstanceOf[Boolean]
+  )
+
+  override def aggregator: AggregateFunction[Boolean, MinAccumulator[Boolean]] =
+    new BooleanMinAggFunction()
+}
+
+class DecimalMinAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, MinAccumulator[BigDecimal]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("1000"),
+      new BigDecimal("-1"),
+      new BigDecimal("-999.998999"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-999.999"),
+      null,
+      new BigDecimal("999.999")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("-999.999"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, MinAccumulator[BigDecimal]] =
+    new DecimalMinAggFunction()
+}
+
+class StringMinAggFunctionTest
+  extends AggFunctionTestBase[String, MinAccumulator[String]] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new String("a"),
+      new String("b"),
+      new String("c"),
+      null.asInstanceOf[String],
+      new String("d")
+    ),
+    Seq(
+      null.asInstanceOf[String],
+      null.asInstanceOf[String],
+      null.asInstanceOf[String]
+    ),
+    Seq(
+      new String("1House"),
+      new String("Household"),
+      new String("house"),
+      new String("household")
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    new String("a"),
+    null.asInstanceOf[String],
+    new String("1House")
+  )
+
+  override def aggregator: AggregateFunction[String, MinAccumulator[String]] =
+    new StringMinAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..c4273f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in Min with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MinWithRetractAggFunctionTest[T: Numeric]
+  extends AggFunctionTestBase[T, MinWithRetractAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      maxVal,
+      numeric.fromInt(-99),
+      numeric.fromInt(3),
+      numeric.fromInt(56),
+      numeric.fromInt(0),
+      minVal,
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    minVal,
+    null.asInstanceOf[T]
+  )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class ByteMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Byte] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator: AggregateFunction[Byte, MinWithRetractAccumulator[Byte]] =
+    new ByteMinWithRetractAggFunction()
+}
+
+class ShortMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Short] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator: AggregateFunction[Short, MinWithRetractAccumulator[Short]] =
+    new ShortMinWithRetractAggFunction()
+}
+
+class IntMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Int] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Int, MinWithRetractAccumulator[Int]] =
+    new IntMinWithRetractAggFunction()
+}
+
+class LongMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Long] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Long, MinWithRetractAccumulator[Long]] =
+    new LongMinWithRetractAggFunction()
+}
+
+class FloatMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Float] {
+
+  override def minVal = Float.MinValue / 2
+
+  override def maxVal = Float.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Float, MinWithRetractAccumulator[Float]] =
+    new FloatMinWithRetractAggFunction()
+}
+
+class DoubleMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Double] {
+
+  override def minVal = Double.MinValue / 2
+
+  override def maxVal = Double.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Double, MinWithRetractAccumulator[Double]] =
+    new DoubleMinWithRetractAggFunction()
+}
+
+class BooleanMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[Boolean, MinWithRetractAccumulator[Boolean]] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    false,
+    null.asInstanceOf[Boolean]
+  )
+
+  override def aggregator: AggregateFunction[Boolean, MinWithRetractAccumulator[Boolean]] =
+    new BooleanMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class DecimalMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, MinWithRetractAccumulator[BigDecimal]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("1000"),
+      new BigDecimal("-1"),
+      new BigDecimal("-999.998999"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-999.999"),
+      null,
+      new BigDecimal("999.999")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("-999.999"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, MinWithRetractAccumulator[BigDecimal]] =
+    new DecimalMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class StringMinWithRetractAggFunctionTest
+  extends AggFunctionTestBase[String, MinWithRetractAccumulator[String]] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      "abc",
+      "def",
+      "ghi",
+      null,
+      "jkl",
+      null,
+      "zzz"
+    ),
+    Seq(
+      null,
+      null
+    ),
+    Seq(
+      "x",
+      null,
+      "e"
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    "abc",
+    null,
+    "e"
+  )
+
+  override def aggregator: AggregateFunction[String, MinWithRetractAccumulator[String]] =
+    new StringMinWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0AggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0AggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0AggFunctionTest.scala
new file mode 100644
index 0000000..45c5aea
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0AggFunctionTest.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in sum0 aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class Sum0AggFunctionTestBase[T: Numeric]
+  extends AggFunctionTestBase[T, SumAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def maxVal: T
+
+  private val minVal = numeric.negate(maxVal)
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      minVal,
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      numeric.fromInt(2),
+      numeric.fromInt(3),
+      numeric.fromInt(4),
+      numeric.fromInt(5),
+      numeric.fromInt(-10),
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T],
+      maxVal
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    numeric.fromInt(2),
+    0.asInstanceOf[T]
+  )
+}
+
+class ByteSum0AggFunctionTest extends Sum0AggFunctionTestBase[Byte] {
+
+  override def maxVal = (Byte.MaxValue / 2).toByte
+
+  override def aggregator: AggregateFunction[Byte, SumAccumulator[Byte]] =
+    new ByteSum0AggFunction
+}
+
+class ShortSum0AggFunctionTest extends Sum0AggFunctionTestBase[Short] {
+
+  override def maxVal = (Short.MaxValue / 2).toShort
+
+  override def aggregator: AggregateFunction[Short, SumAccumulator[Short]] =
+    new ShortSum0AggFunction
+}
+
+class IntSum0AggFunctionTest extends Sum0AggFunctionTestBase[Int] {
+
+  override def maxVal = Int.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Int, SumAccumulator[Int]] =
+    new IntSum0AggFunction
+}
+
+class LongSum0AggFunctionTest extends Sum0AggFunctionTestBase[Long] {
+
+  override def maxVal = Long.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Long, SumAccumulator[Long]] =
+    new LongSum0AggFunction
+}
+
+class FloatSum0AggFunctionTest extends Sum0AggFunctionTestBase[Float] {
+
+  override def maxVal = 12345.6789f
+
+  override def aggregator: AggregateFunction[Float, SumAccumulator[Float]] =
+    new FloatSum0AggFunction
+}
+
+class DoubleSum0AggFunctionTest extends Sum0AggFunctionTestBase[Double] {
+
+  override def maxVal = 12345.6789d
+
+  override def aggregator: AggregateFunction[Double, SumAccumulator[Double]] =
+    new DoubleSum0AggFunction
+}
+
+
+class DecimalSum0AggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, DecimalSumAccumulator] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("2"),
+      new BigDecimal("3"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-1000"),
+      new BigDecimal("0.000000000002"),
+      new BigDecimal("1000"),
+      new BigDecimal("-0.000000000001"),
+      new BigDecimal("999.999"),
+      null,
+      new BigDecimal("4"),
+      new BigDecimal("-999.999"),
+      null
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("10.000000000001"),
+    BigDecimal.ZERO
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, DecimalSumAccumulator] =
+    new DecimalSum0AggFunction()
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0WithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0WithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0WithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..7779614
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/Sum0WithRetractAggFunctionTest.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in sum0 with retract aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class Sum0WithRetractAggFunctionTestBase[T: Numeric]
+  extends AggFunctionTestBase[T, SumWithRetractAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def maxVal: T
+
+  private val minVal = numeric.negate(maxVal)
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      minVal,
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      numeric.fromInt(2),
+      numeric.fromInt(3),
+      numeric.fromInt(4),
+      numeric.fromInt(5),
+      numeric.fromInt(-10),
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T],
+      maxVal
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    numeric.fromInt(2),
+    0.asInstanceOf[T]
+  )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class ByteSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Byte] {
+
+  override def maxVal = (Byte.MaxValue / 2).toByte
+
+  override def aggregator: AggregateFunction[Byte, SumWithRetractAccumulator[Byte]] =
+    new ByteSum0WithRetractAggFunction
+}
+
+class ShortSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Short] {
+
+  override def maxVal = (Short.MaxValue / 2).toShort
+
+  override def aggregator: AggregateFunction[Short, SumWithRetractAccumulator[Short]] =
+    new ShortSum0WithRetractAggFunction
+}
+
+class IntSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Int] {
+
+  override def maxVal = Int.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Int, SumWithRetractAccumulator[Int]] =
+    new IntSum0WithRetractAggFunction
+}
+
+class LongSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Long] {
+
+  override def maxVal = Long.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Long, SumWithRetractAccumulator[Long]] =
+    new LongSum0WithRetractAggFunction
+}
+
+class FloatSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Float] {
+
+  override def maxVal = 12345.6789f
+
+  override def aggregator: AggregateFunction[Float, SumWithRetractAccumulator[Float]] =
+    new FloatSum0WithRetractAggFunction
+}
+
+class DoubleSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Double] {
+
+  override def maxVal = 12345.6789d
+
+  override def aggregator: AggregateFunction[Double, SumWithRetractAccumulator[Double]] =
+    new DoubleSum0WithRetractAggFunction
+}
+
+
+class DecimalSum0WithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, DecimalSumWithRetractAccumulator] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("2"),
+      new BigDecimal("3"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-1000"),
+      new BigDecimal("0.000000000002"),
+      new BigDecimal("1000"),
+      new BigDecimal("-0.000000000001"),
+      new BigDecimal("999.999"),
+      null,
+      new BigDecimal("4"),
+      new BigDecimal("-999.999"),
+      null
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("10.000000000001"),
+    BigDecimal.ZERO
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, DecimalSumWithRetractAccumulator] =
+    new DecimalSum0WithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumAggFunctionTest.scala
new file mode 100644
index 0000000..a1f31ad
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumAggFunctionTest.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in sum aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumAggFunctionTestBase[T: Numeric]
+  extends AggFunctionTestBase[T, SumAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def maxVal: T
+
+  private val minVal = numeric.negate(maxVal)
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      minVal,
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      numeric.fromInt(2),
+      numeric.fromInt(3),
+      numeric.fromInt(4),
+      numeric.fromInt(5),
+      numeric.fromInt(-10),
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T],
+      maxVal
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    numeric.fromInt(2),
+    null.asInstanceOf[T]
+  )
+}
+
+class ByteSumAggFunctionTest extends SumAggFunctionTestBase[Byte] {
+
+  override def maxVal = (Byte.MaxValue / 2).toByte
+
+  override def aggregator: AggregateFunction[Byte, SumAccumulator[Byte]] =
+    new ByteSumAggFunction
+}
+
+class ShortSumAggFunctionTest extends SumAggFunctionTestBase[Short] {
+
+  override def maxVal = (Short.MaxValue / 2).toShort
+
+  override def aggregator: AggregateFunction[Short, SumAccumulator[Short]] =
+    new ShortSumAggFunction
+}
+
+class IntSumAggFunctionTest extends SumAggFunctionTestBase[Int] {
+
+  override def maxVal = Int.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Int, SumAccumulator[Int]] =
+    new IntSumAggFunction
+}
+
+class LongSumAggFunctionTest extends SumAggFunctionTestBase[Long] {
+
+  override def maxVal = Long.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Long, SumAccumulator[Long]] =
+    new LongSumAggFunction
+}
+
+class FloatSumAggFunctionTest extends SumAggFunctionTestBase[Float] {
+
+  override def maxVal = 12345.6789f
+
+  override def aggregator: AggregateFunction[Float, SumAccumulator[Float]] =
+    new FloatSumAggFunction
+}
+
+class DoubleSumAggFunctionTest extends SumAggFunctionTestBase[Double] {
+
+  override def maxVal = 12345.6789d
+
+  override def aggregator: AggregateFunction[Double, SumAccumulator[Double]] =
+    new DoubleSumAggFunction
+}
+
+
+class DecimalSumAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, DecimalSumAccumulator] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("2"),
+      new BigDecimal("3"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-1000"),
+      new BigDecimal("0.000000000002"),
+      new BigDecimal("1000"),
+      new BigDecimal("-0.000000000001"),
+      new BigDecimal("999.999"),
+      null,
+      new BigDecimal("4"),
+      new BigDecimal("-999.999"),
+      null
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("10.000000000001"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, DecimalSumAccumulator] =
+    new DecimalSumAggFunction()
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumWithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..7a992d7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/SumWithRetractAggFunctionTest.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions._
+
+/**
+  * Test case for built-in sum with retract aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumWithRetractAggFunctionTestBase[T: Numeric]
+  extends AggFunctionTestBase[T, SumWithRetractAccumulator[T]] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def maxVal: T
+
+  private val minVal = numeric.negate(maxVal)
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      minVal,
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      numeric.fromInt(2),
+      numeric.fromInt(3),
+      numeric.fromInt(4),
+      numeric.fromInt(5),
+      numeric.fromInt(-10),
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T],
+      maxVal
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    numeric.fromInt(2),
+    null.asInstanceOf[T]
+  )
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+class ByteSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Byte] {
+
+  override def maxVal = (Byte.MaxValue / 2).toByte
+
+  override def aggregator: AggregateFunction[Byte, SumWithRetractAccumulator[Byte]] =
+    new ByteSumWithRetractAggFunction
+}
+
+class ShortSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Short] {
+
+  override def maxVal = (Short.MaxValue / 2).toShort
+
+  override def aggregator: AggregateFunction[Short, SumWithRetractAccumulator[Short]] =
+    new ShortSumWithRetractAggFunction
+}
+
+class IntSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Int] {
+
+  override def maxVal = Int.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Int, SumWithRetractAccumulator[Int]] =
+    new IntSumWithRetractAggFunction
+}
+
+class LongSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Long] {
+
+  override def maxVal = Long.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Long, SumWithRetractAccumulator[Long]] =
+    new LongSumWithRetractAggFunction
+}
+
+class FloatSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Float] {
+
+  override def maxVal = 12345.6789f
+
+  override def aggregator: AggregateFunction[Float, SumWithRetractAccumulator[Float]] =
+    new FloatSumWithRetractAggFunction
+}
+
+class DoubleSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Double] {
+
+  override def maxVal = 12345.6789d
+
+  override def aggregator: AggregateFunction[Double, SumWithRetractAccumulator[Double]] =
+    new DoubleSumWithRetractAggFunction
+}
+
+
+class DecimalSumWithRetractAggFunctionTest
+  extends AggFunctionTestBase[BigDecimal, DecimalSumWithRetractAccumulator] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("2"),
+      new BigDecimal("3"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-1000"),
+      new BigDecimal("0.000000000002"),
+      new BigDecimal("1000"),
+      new BigDecimal("-0.000000000001"),
+      new BigDecimal("999.999"),
+      null,
+      new BigDecimal("4"),
+      new BigDecimal("-999.999"),
+      null
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("10.000000000001"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal, DecimalSumWithRetractAccumulator] =
+    new DecimalSumWithRetractAggFunction()
+
+  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
deleted file mode 100644
index df4e1aa..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.lang.{Integer => JInt, Long => JLong}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._
-import org.apache.flink.api.java.typeutils.runtime.RowComparator
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.TypeComparator
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.streaming.api.TimeCharacteristic
-
-class TimeSortProcessFunctionTest {
-
-  
-  @Test
-  def testSortProcTimeHarnessPartitioned(): Unit = {
-    
-    val rT =  new RowTypeInfo(Array[TypeInformation[_]](
-      INT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      INT_TYPE_INFO,
-      STRING_TYPE_INFO,
-      LONG_TYPE_INFO),
-      Array("a", "b", "c", "d", "e"))
-    
-    val indexes = Array(1, 2)
-      
-    val fieldComps = Array[TypeComparator[AnyRef]](
-      LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]],
-      INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] )
-    val booleanOrders = Array(true, false)    
-    
-
-    val rowComp = new RowComparator(
-      rT.getTotalFields,
-      indexes,
-      fieldComps,
-      new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons
-      booleanOrders)
-    
-    val collectionRowComparator = new CollectionRowComparator(rowComp)
-    
-    val inputCRowType = CRowTypeInfo(rT)
-    
-    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
-      new ProcTimeSortProcessFunction(
-        inputCRowType,
-        collectionRowComparator))
-  
-   val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,CRow,CRow](
-      processFunction, 
-      new TupleRowSelector(0), 
-      BasicTypeInfo.INT_TYPE_INFO)
-    
-   testHarness.open()
-
-   testHarness.setProcessingTime(3)
-
-      // timestamp is ignored in processing time
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2003))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2004))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
-
-    //move the timestamp to ensure the execution
-    testHarness.setProcessingTime(1005)
-    
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
-    
-    testHarness.setProcessingTime(1008)
-    
-    val result = testHarness.getOutput
-    
-    val expectedOutput = new ConcurrentLinkedQueue[Object]()
-    
-    // all elements at the same proc timestamp have the same value
-    // elements should be sorted ascending on field 1 and descending on field 2
-    // (10,0) (11,1) (12,2) (12,1) (12,0)
-    // (1,0) (2,0)
-    
-     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
-     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true), 4))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
-    
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
-    expectedOutput.add(new StreamRecord(new CRow(
-        Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
-
-    TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result)
-    
-    testHarness.close()
-  }
-  
-  @Test
-  def testSortRowTimeHarnessPartitioned(): Unit = {
-    
-    val rT =  new RowTypeInfo(Array[TypeInformation[_]](
-      INT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      INT_TYPE_INFO,
-      STRING_TYPE_INFO,
-      LONG_TYPE_INFO),
-      Array("a", "b", "c", "d", "e"))
-
-    val indexes = Array(1, 2)
-      
-    val fieldComps = Array[TypeComparator[AnyRef]](
-      LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]],
-      INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] )
-    val booleanOrders = Array(true, false)    
-
-    val rowComp = new RowComparator(
-      rT.getTotalFields,
-      indexes,
-      fieldComps,
-      new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons
-      booleanOrders)
-    
-    val collectionRowComparator = new CollectionRowComparator(rowComp)
-    
-    val inputCRowType = CRowTypeInfo(rT)
-    
-    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
-      new RowTimeSortProcessFunction(
-        inputCRowType,
-        Some(collectionRowComparator)))
-  
-   val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, CRow](
-      processFunction, 
-      new TupleRowSelector(0), 
-      BasicTypeInfo.INT_TYPE_INFO)
-    
-   testHarness.open()
-
-   testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime)
-   testHarness.processWatermark(3)
-
-      // timestamp is ignored in processing time
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2002))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2002))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2004))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
-
-    // move watermark forward
-    testHarness.processWatermark(2007)
-
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002)) // too late
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2019)) // too early
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
-    testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
-
-    // move watermark forward
-    testHarness.processWatermark(2012)
-
-    val result = testHarness.getOutput
-    
-    val expectedOutput = new ConcurrentLinkedQueue[Object]()
-    
-    // all elements at the same proc timestamp have the same value
-    // elements should be sorted ascending on field 1 and descending on field 2
-    // (10,0) (11,1) (12,2) (12,1) (12,0)
-    expectedOutput.add(new Watermark(3))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 1001))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2002))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 2002))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong),true), 2002))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2002))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2004))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2006))
-    expectedOutput.add(new Watermark(2007))
-
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
-
-    expectedOutput.add(new Watermark(2012))
-
-    TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result)
-        
-    testHarness.close()
-        
-  }
-}
-
-object TimeSortProcessFunctionTest {
-
-  /**
-   * Simple test class that returns a specified field as the selector function
-   */
-  class TupleRowSelector(private val selectorField: Int) extends KeySelector[CRow, Integer] {
-
-    override def getKey(value: CRow): Integer = {
-      value.row.getField(selectorField).asInstanceOf[Integer]
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
new file mode 100644
index 0000000..39b8371
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AggregateITCase(
+    configMode: TableConfigMode)
+  extends TableProgramsCollectionTestBase(configMode) {
+
+  @Test
+  def testAggregationTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231,1,21,21,11"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDataSetAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g), " +
+      "min(g), min('Ciao'), max(g), max('Ciao'), sum(CAST(f AS DECIMAL)) FROM MyTable"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
+      "FROM MyTable"
+
+    val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,3,2,1,3"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTableAggregationWithArithmetic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
+      "FROM MyTable"
+
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "5.5,7"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAggregationWithTwoCount(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
+
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "2,2"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testAggregationAfterProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
+      "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,3,2"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDistinctAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "231,21"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedDistinctAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected =
+      "6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupingSetAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery =
+      "SELECT _2, _3, avg(_1) as a, GROUP_ID() as g FROM MyTable GROUP BY GROUPING SETS (_2, _3)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+    val expected =
+      "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
+        "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
+        "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" +
+        "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" +
+        "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" +
+        "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" +
+        "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" +
+        "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testAggregateEmptyDataSets(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT avg(a), sum(a), count(b) " +
+      "FROM MyTable where a = 4 group by a"
+
+    val sqlQuery2 = "SELECT avg(a), sum(a), count(b) " +
+      "FROM MyTable where a = 4"
+
+    val sqlQuery3 = "SELECT avg(a), sum(a), count(b) " +
+      "FROM MyTable"
+
+    val ds = env.fromElements(
+      (1: Byte, 1: Short),
+      (2: Byte, 2: Short))
+      .toTable(tEnv, 'a, 'b)
+
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+    val result2 = tEnv.sql(sqlQuery2)
+    val result3 = tEnv.sql(sqlQuery3)
+
+    val results = result.toDataSet[Row].collect()
+    val expected = Seq.empty
+    val results2 =  result2.toDataSet[Row].collect()
+    val expected2 = "null,null,0"
+    val results3 = result3.toDataSet[Row].collect()
+    val expected3 = "1,3,2"
+
+    assert(results.equals(expected),
+      "Empty result is expected for grouped set, but actual: " + results)
+    TestBaseUtils.compareResultAsText(results2.asJava, expected2)
+    TestBaseUtils.compareResultAsText(results3.asJava, expected3)
+  }
+
+  @Test
+  def testTumbleWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.registerFunction("countFun", new CountAggFunction)
+    tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
+
+    val sqlQuery =
+      "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" +
+        "FROM T " +
+        "GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1,1,1",
+      "2,2,1,2,2", "2,3,1,2,3",
+      "3,9,2,3,4", "3,6,1,3,6",
+      "4,15,2,4,7", "4,19,2,4,9",
+      "5,11,1,5,11", "5,39,3,5,13", "5,15,1,5,15",
+      "6,33,2,6,16", "6,57,3,6,19", "6,21,1,6,21"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testHopWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.registerFunction("countFun", new CountAggFunction)
+    tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
+
+    env.setParallelism(1)
+
+    val sqlQuery =
+      "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" +
+        "FROM T " +
+        "GROUP BY b, HOP(ts, INTERVAL '2' SECOND, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1,1,1","1,1,1,1,1",
+      "2,5,2,2,2","2,5,2,2,2",
+      "3,9,2,3,4", "3,15,3,3,5", "3,6,1,3,6",
+      "4,7,1,4,7", "4,24,3,4,8", "4,27,3,4,9", "4,10,1,4,10",
+      "5,11,1,5,11", "5,36,3,5,12", "5,54,4,5,13", "5,29,2,5,14",
+      "6,33,2,6,16", "6,70,4,6,17", "6,78,4,6,19", "6,41,2,6,20"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testSessionWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.registerFunction("countFun", new CountAggFunction)
+    tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
+
+    env.setParallelism(1)
+
+    val sqlQuery =
+      "SELECT MIN(a), MAX(a), SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), " +
+        "wAvgWithMergeAndReset(a, a)" +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .filter(x => (x._2 % 2) == 0)
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "2,10,39,6,3,7",
+      "16,21,111,6,6,18"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+}


Mime
View raw message