flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [30/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
Date Thu, 13 Jul 2017 10:18:39 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
deleted file mode 100644
index 6830b8f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.aggfunctions
-
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in count aggregate function
-  */
-class CountAggFunctionTest extends AggFunctionTestBase[Long, CountAccumulator] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq("a", "b", null, "c", null, "d", "e", null, "f"),
-    Seq(null, null, null, null, null, null)
-  )
-
-  override def expectedResults: Seq[Long] = Seq(6L, 0L)
-
-  override def aggregator: AggregateFunction[Long, CountAccumulator] = new CountAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
deleted file mode 100644
index 54c2a7d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in max aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, MaxAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      maxVal,
-      numeric.fromInt(-99),
-      numeric.fromInt(3),
-      numeric.fromInt(56),
-      numeric.fromInt(0),
-      minVal,
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    maxVal,
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator: AggregateFunction[Byte, MaxAccumulator[Byte]] =
-    new ByteMaxAggFunction()
-}
-
-class ShortMaxAggFunctionTest extends MaxAggFunctionTest[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator: AggregateFunction[Short, MaxAccumulator[Short]] =
-    new ShortMaxAggFunction()
-}
-
-class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] {
-
-  override def minVal = Int.MinValue + 1
-
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Int, MaxAccumulator[Int]] =
-    new IntMaxAggFunction()
-}
-
-class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] {
-
-  override def minVal = Long.MinValue + 1
-
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Long, MaxAccumulator[Long]] =
-    new LongMaxAggFunction()
-}
-
-class FloatMaxAggFunctionTest extends MaxAggFunctionTest[Float] {
-
-  override def minVal = Float.MinValue / 2
-
-  override def maxVal = Float.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Float, MaxAccumulator[Float]] =
-    new FloatMaxAggFunction()
-}
-
-class DoubleMaxAggFunctionTest extends MaxAggFunctionTest[Double] {
-
-  override def minVal = Double.MinValue / 2
-
-  override def maxVal = Double.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Double, MaxAccumulator[Double]] =
-    new DoubleMaxAggFunction()
-}
-
-class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean, MaxAccumulator[Boolean]] {
-
-  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
-    Seq(
-      false,
-      false,
-      false
-    ),
-    Seq(
-      true,
-      true,
-      true
-    ),
-    Seq(
-      true,
-      false,
-      null.asInstanceOf[Boolean],
-      true,
-      false,
-      true,
-      null.asInstanceOf[Boolean]
-    ),
-    Seq(
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean]
-    )
-  )
-
-  override def expectedResults: Seq[Boolean] = Seq(
-    false,
-    true,
-    true,
-    null.asInstanceOf[Boolean]
-  )
-
-  override def aggregator: AggregateFunction[Boolean, MaxAccumulator[Boolean]] =
-    new BooleanMaxAggFunction()
-}
-
-class DecimalMaxAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, MaxAccumulator[BigDecimal]] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("1000.000001"),
-      new BigDecimal("-1"),
-      new BigDecimal("-999.998999"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-999.999"),
-      null,
-      new BigDecimal("999.999")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("1000.000001"),
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, MaxAccumulator[BigDecimal]] =
-    new DecimalMaxAggFunction()
-}
-
-class StringMaxAggFunctionTest extends AggFunctionTestBase[String, MaxAccumulator[String]] {
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new String("a"),
-      new String("b"),
-      new String("c"),
-      null.asInstanceOf[String],
-      new String("d")
-    ),
-    Seq(
-      null.asInstanceOf[String],
-      null.asInstanceOf[String],
-      null.asInstanceOf[String]
-    ),
-    Seq(
-      new String("1House"),
-      new String("Household"),
-      new String("house"),
-      new String("household")
-    )
-  )
-
-  override def expectedResults: Seq[String] = Seq(
-    new String("d"),
-    null.asInstanceOf[String],
-    new String("household")
-  )
-
-  override def aggregator: AggregateFunction[String, MaxAccumulator[String]] =
-    new StringMaxAggFunction()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
deleted file mode 100644
index c11ae41..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in max with retraction aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class MaxWithRetractAggFunctionTest[T: Numeric]
-  extends AggFunctionTestBase[T, MaxWithRetractAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      maxVal,
-      numeric.fromInt(-99),
-      numeric.fromInt(3),
-      numeric.fromInt(56),
-      numeric.fromInt(0),
-      minVal,
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    maxVal,
-    null.asInstanceOf[T]
-  )
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class ByteMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator: AggregateFunction[Byte, MaxWithRetractAccumulator[Byte]] =
-    new ByteMaxWithRetractAggFunction()
-}
-
-class ShortMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator: AggregateFunction[Short, MaxWithRetractAccumulator[Short]] =
-    new ShortMaxWithRetractAggFunction()
-}
-
-class IntMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Int] {
-
-  override def minVal = Int.MinValue + 1
-
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Int, MaxWithRetractAccumulator[Int]] =
-    new IntMaxWithRetractAggFunction()
-}
-
-class LongMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Long] {
-
-  override def minVal = Long.MinValue + 1
-
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Long, MaxWithRetractAccumulator[Long]] =
-    new LongMaxWithRetractAggFunction()
-}
-
-class FloatMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Float] {
-
-  override def minVal = Float.MinValue / 2
-
-  override def maxVal = Float.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Float, MaxWithRetractAccumulator[Float]] =
-    new FloatMaxWithRetractAggFunction()
-}
-
-class DoubleMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Double] {
-
-  override def minVal = Double.MinValue / 2
-
-  override def maxVal = Double.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Double, MaxWithRetractAccumulator[Double]] =
-    new DoubleMaxWithRetractAggFunction()
-}
-
-class BooleanMaxWithRetractAggFunctionTest
-  extends AggFunctionTestBase[Boolean, MaxWithRetractAccumulator[Boolean]] {
-
-  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
-    Seq(
-      false,
-      false,
-      false
-    ),
-    Seq(
-      true,
-      true,
-      true
-    ),
-    Seq(
-      true,
-      false,
-      null.asInstanceOf[Boolean],
-      true,
-      false,
-      true,
-      null.asInstanceOf[Boolean]
-    ),
-    Seq(
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean]
-    )
-  )
-
-  override def expectedResults: Seq[Boolean] = Seq(
-    false,
-    true,
-    true,
-    null.asInstanceOf[Boolean]
-  )
-
-  override def aggregator: AggregateFunction[Boolean, MaxWithRetractAccumulator[Boolean]] =
-    new BooleanMaxWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class DecimalMaxWithRetractAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, MaxWithRetractAccumulator[BigDecimal]] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("1000.000001"),
-      new BigDecimal("-1"),
-      new BigDecimal("-999.998999"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-999.999"),
-      null,
-      new BigDecimal("999.999")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("1000.000001"),
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, MaxWithRetractAccumulator[BigDecimal]] =
-    new DecimalMaxWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class StringMaxWithRetractAggFunctionTest
-  extends AggFunctionTestBase[String, MaxWithRetractAccumulator[String]] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      "abc",
-      "def",
-      "ghi",
-      null,
-      "jkl",
-      null,
-      "zzz"
-    ),
-    Seq(
-      null,
-      null
-    ),
-    Seq(
-      "x",
-      null,
-      "e"
-    )
-  )
-
-  override def expectedResults: Seq[String] = Seq(
-    "zzz",
-    null,
-    "x"
-  )
-
-  override def aggregator: AggregateFunction[String, MaxWithRetractAccumulator[String]] =
-    new StringMaxWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
deleted file mode 100644
index 6f41bd2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in min aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T, MinAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      maxVal,
-      numeric.fromInt(-99),
-      numeric.fromInt(3),
-      numeric.fromInt(56),
-      numeric.fromInt(0),
-      minVal,
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    minVal,
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator: AggregateFunction[Byte, MinAccumulator[Byte]] =
-    new ByteMinAggFunction()
-}
-
-class ShortMinAggFunctionTest extends MinAggFunctionTest[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator: AggregateFunction[Short, MinAccumulator[Short]] =
-    new ShortMinAggFunction()
-}
-
-class IntMinAggFunctionTest extends MinAggFunctionTest[Int] {
-
-  override def minVal = Int.MinValue + 1
-
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Int, MinAccumulator[Int]] =
-    new IntMinAggFunction()
-}
-
-class LongMinAggFunctionTest extends MinAggFunctionTest[Long] {
-
-  override def minVal = Long.MinValue + 1
-
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Long, MinAccumulator[Long]] =
-    new LongMinAggFunction()
-}
-
-class FloatMinAggFunctionTest extends MinAggFunctionTest[Float] {
-
-  override def minVal = Float.MinValue / 2
-
-  override def maxVal = Float.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Float, MinAccumulator[Float]] =
-    new FloatMinAggFunction()
-}
-
-class DoubleMinAggFunctionTest extends MinAggFunctionTest[Double] {
-
-  override def minVal = Double.MinValue / 2
-
-  override def maxVal = Double.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Double, MinAccumulator[Double]] =
-    new DoubleMinAggFunction()
-}
-
-class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean, MinAccumulator[Boolean]] {
-
-  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
-    Seq(
-      false,
-      false,
-      false
-    ),
-    Seq(
-      true,
-      true,
-      true
-    ),
-    Seq(
-      true,
-      false,
-      null.asInstanceOf[Boolean],
-      true,
-      false,
-      true,
-      null.asInstanceOf[Boolean]
-    ),
-    Seq(
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean]
-    )
-  )
-
-  override def expectedResults: Seq[Boolean] = Seq(
-    false,
-    true,
-    false,
-    null.asInstanceOf[Boolean]
-  )
-
-  override def aggregator: AggregateFunction[Boolean, MinAccumulator[Boolean]] =
-    new BooleanMinAggFunction()
-}
-
-class DecimalMinAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, MinAccumulator[BigDecimal]] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("1000"),
-      new BigDecimal("-1"),
-      new BigDecimal("-999.998999"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-999.999"),
-      null,
-      new BigDecimal("999.999")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("-999.999"),
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, MinAccumulator[BigDecimal]] =
-    new DecimalMinAggFunction()
-}
-
-class StringMinAggFunctionTest
-  extends AggFunctionTestBase[String, MinAccumulator[String]] {
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new String("a"),
-      new String("b"),
-      new String("c"),
-      null.asInstanceOf[String],
-      new String("d")
-    ),
-    Seq(
-      null.asInstanceOf[String],
-      null.asInstanceOf[String],
-      null.asInstanceOf[String]
-    ),
-    Seq(
-      new String("1House"),
-      new String("Household"),
-      new String("house"),
-      new String("household")
-    )
-  )
-
-  override def expectedResults: Seq[String] = Seq(
-    new String("a"),
-    null.asInstanceOf[String],
-    new String("1House")
-  )
-
-  override def aggregator: AggregateFunction[String, MinAccumulator[String]] =
-    new StringMinAggFunction()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
deleted file mode 100644
index e13e69b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in Min with retraction aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class MinWithRetractAggFunctionTest[T: Numeric]
-  extends AggFunctionTestBase[T, MinWithRetractAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      maxVal,
-      numeric.fromInt(-99),
-      numeric.fromInt(3),
-      numeric.fromInt(56),
-      numeric.fromInt(0),
-      minVal,
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    minVal,
-    null.asInstanceOf[T]
-  )
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class ByteMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator: AggregateFunction[Byte, MinWithRetractAccumulator[Byte]] =
-    new ByteMinWithRetractAggFunction()
-}
-
-class ShortMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator: AggregateFunction[Short, MinWithRetractAccumulator[Short]] =
-    new ShortMinWithRetractAggFunction()
-}
-
-class IntMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Int] {
-
-  override def minVal = Int.MinValue + 1
-
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Int, MinWithRetractAccumulator[Int]] =
-    new IntMinWithRetractAggFunction()
-}
-
-class LongMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Long] {
-
-  override def minVal = Long.MinValue + 1
-
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator: AggregateFunction[Long, MinWithRetractAccumulator[Long]] =
-    new LongMinWithRetractAggFunction()
-}
-
-class FloatMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Float] {
-
-  override def minVal = Float.MinValue / 2
-
-  override def maxVal = Float.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Float, MinWithRetractAccumulator[Float]] =
-    new FloatMinWithRetractAggFunction()
-}
-
-class DoubleMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Double] {
-
-  override def minVal = Double.MinValue / 2
-
-  override def maxVal = Double.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Double, MinWithRetractAccumulator[Double]] =
-    new DoubleMinWithRetractAggFunction()
-}
-
-class BooleanMinWithRetractAggFunctionTest
-  extends AggFunctionTestBase[Boolean, MinWithRetractAccumulator[Boolean]] {
-
-  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
-    Seq(
-      false,
-      false,
-      false
-    ),
-    Seq(
-      true,
-      true,
-      true
-    ),
-    Seq(
-      true,
-      false,
-      null.asInstanceOf[Boolean],
-      true,
-      false,
-      true,
-      null.asInstanceOf[Boolean]
-    ),
-    Seq(
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean]
-    )
-  )
-
-  override def expectedResults: Seq[Boolean] = Seq(
-    false,
-    true,
-    false,
-    null.asInstanceOf[Boolean]
-  )
-
-  override def aggregator: AggregateFunction[Boolean, MinWithRetractAccumulator[Boolean]] =
-    new BooleanMinWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class DecimalMinWithRetractAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, MinWithRetractAccumulator[BigDecimal]] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("1000"),
-      new BigDecimal("-1"),
-      new BigDecimal("-999.998999"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-999.999"),
-      null,
-      new BigDecimal("999.999")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("-999.999"),
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, MinWithRetractAccumulator[BigDecimal]] =
-    new DecimalMinWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class StringMinWithRetractAggFunctionTest
-  extends AggFunctionTestBase[String, MinWithRetractAccumulator[String]] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      "abc",
-      "def",
-      "ghi",
-      null,
-      "jkl",
-      null,
-      "zzz"
-    ),
-    Seq(
-      null,
-      null
-    ),
-    Seq(
-      "x",
-      null,
-      "e"
-    )
-  )
-
-  override def expectedResults: Seq[String] = Seq(
-    "abc",
-    null,
-    "e"
-  )
-
-  override def aggregator: AggregateFunction[String, MinWithRetractAccumulator[String]] =
-    new StringMinWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunctionTest.scala
deleted file mode 100644
index ba98fa2..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunctionTest.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in sum0 aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class Sum0AggFunctionTestBase[T: Numeric]
-  extends AggFunctionTestBase[T, SumAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def maxVal: T
-
-  private val minVal = numeric.negate(maxVal)
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      numeric.fromInt(2),
-      numeric.fromInt(3),
-      numeric.fromInt(4),
-      numeric.fromInt(5),
-      numeric.fromInt(-10),
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T],
-      maxVal
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    numeric.fromInt(2),
-    0.asInstanceOf[T]
-  )
-}
-
-class ByteSum0AggFunctionTest extends Sum0AggFunctionTestBase[Byte] {
-
-  override def maxVal = (Byte.MaxValue / 2).toByte
-
-  override def aggregator: AggregateFunction[Byte, SumAccumulator[Byte]] =
-    new ByteSum0AggFunction
-}
-
-class ShortSum0AggFunctionTest extends Sum0AggFunctionTestBase[Short] {
-
-  override def maxVal = (Short.MaxValue / 2).toShort
-
-  override def aggregator: AggregateFunction[Short, SumAccumulator[Short]] =
-    new ShortSum0AggFunction
-}
-
-class IntSum0AggFunctionTest extends Sum0AggFunctionTestBase[Int] {
-
-  override def maxVal = Int.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Int, SumAccumulator[Int]] =
-    new IntSum0AggFunction
-}
-
-class LongSum0AggFunctionTest extends Sum0AggFunctionTestBase[Long] {
-
-  override def maxVal = Long.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Long, SumAccumulator[Long]] =
-    new LongSum0AggFunction
-}
-
-class FloatSum0AggFunctionTest extends Sum0AggFunctionTestBase[Float] {
-
-  override def maxVal = 12345.6789f
-
-  override def aggregator: AggregateFunction[Float, SumAccumulator[Float]] =
-    new FloatSum0AggFunction
-}
-
-class DoubleSum0AggFunctionTest extends Sum0AggFunctionTestBase[Double] {
-
-  override def maxVal = 12345.6789d
-
-  override def aggregator: AggregateFunction[Double, SumAccumulator[Double]] =
-    new DoubleSum0AggFunction
-}
-
-
-class DecimalSum0AggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, DecimalSumAccumulator] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("2"),
-      new BigDecimal("3"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-1000"),
-      new BigDecimal("0.000000000002"),
-      new BigDecimal("1000"),
-      new BigDecimal("-0.000000000001"),
-      new BigDecimal("999.999"),
-      null,
-      new BigDecimal("4"),
-      new BigDecimal("-999.999"),
-      null
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("10.000000000001"),
-    BigDecimal.ZERO
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, DecimalSumAccumulator] =
-    new DecimalSum0AggFunction()
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0WithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0WithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0WithRetractAggFunctionTest.scala
deleted file mode 100644
index 0a8f633..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/Sum0WithRetractAggFunctionTest.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in sum0 with retract aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class Sum0WithRetractAggFunctionTestBase[T: Numeric]
-  extends AggFunctionTestBase[T, SumWithRetractAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def maxVal: T
-
-  private val minVal = numeric.negate(maxVal)
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      numeric.fromInt(2),
-      numeric.fromInt(3),
-      numeric.fromInt(4),
-      numeric.fromInt(5),
-      numeric.fromInt(-10),
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T],
-      maxVal
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    numeric.fromInt(2),
-    0.asInstanceOf[T]
-  )
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class ByteSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Byte] {
-
-  override def maxVal = (Byte.MaxValue / 2).toByte
-
-  override def aggregator: AggregateFunction[Byte, SumWithRetractAccumulator[Byte]] =
-    new ByteSum0WithRetractAggFunction
-}
-
-class ShortSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Short] {
-
-  override def maxVal = (Short.MaxValue / 2).toShort
-
-  override def aggregator: AggregateFunction[Short, SumWithRetractAccumulator[Short]] =
-    new ShortSum0WithRetractAggFunction
-}
-
-class IntSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Int] {
-
-  override def maxVal = Int.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Int, SumWithRetractAccumulator[Int]] =
-    new IntSum0WithRetractAggFunction
-}
-
-class LongSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Long] {
-
-  override def maxVal = Long.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Long, SumWithRetractAccumulator[Long]] =
-    new LongSum0WithRetractAggFunction
-}
-
-class FloatSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Float] {
-
-  override def maxVal = 12345.6789f
-
-  override def aggregator: AggregateFunction[Float, SumWithRetractAccumulator[Float]] =
-    new FloatSum0WithRetractAggFunction
-}
-
-class DoubleSum0WithRetractAggFunctionTest extends Sum0WithRetractAggFunctionTestBase[Double] {
-
-  override def maxVal = 12345.6789d
-
-  override def aggregator: AggregateFunction[Double, SumWithRetractAccumulator[Double]] =
-    new DoubleSum0WithRetractAggFunction
-}
-
-
-class DecimalSum0WithRetractAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, DecimalSumWithRetractAccumulator] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("2"),
-      new BigDecimal("3"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-1000"),
-      new BigDecimal("0.000000000002"),
-      new BigDecimal("1000"),
-      new BigDecimal("-0.000000000001"),
-      new BigDecimal("999.999"),
-      null,
-      new BigDecimal("4"),
-      new BigDecimal("-999.999"),
-      null
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("10.000000000001"),
-    BigDecimal.ZERO
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, DecimalSumWithRetractAccumulator] =
-    new DecimalSum0WithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
deleted file mode 100644
index 6816a5e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in sum aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class SumAggFunctionTestBase[T: Numeric]
-  extends AggFunctionTestBase[T, SumAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def maxVal: T
-
-  private val minVal = numeric.negate(maxVal)
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      numeric.fromInt(2),
-      numeric.fromInt(3),
-      numeric.fromInt(4),
-      numeric.fromInt(5),
-      numeric.fromInt(-10),
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T],
-      maxVal
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    numeric.fromInt(2),
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteSumAggFunctionTest extends SumAggFunctionTestBase[Byte] {
-
-  override def maxVal = (Byte.MaxValue / 2).toByte
-
-  override def aggregator: AggregateFunction[Byte, SumAccumulator[Byte]] =
-    new ByteSumAggFunction
-}
-
-class ShortSumAggFunctionTest extends SumAggFunctionTestBase[Short] {
-
-  override def maxVal = (Short.MaxValue / 2).toShort
-
-  override def aggregator: AggregateFunction[Short, SumAccumulator[Short]] =
-    new ShortSumAggFunction
-}
-
-class IntSumAggFunctionTest extends SumAggFunctionTestBase[Int] {
-
-  override def maxVal = Int.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Int, SumAccumulator[Int]] =
-    new IntSumAggFunction
-}
-
-class LongSumAggFunctionTest extends SumAggFunctionTestBase[Long] {
-
-  override def maxVal = Long.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Long, SumAccumulator[Long]] =
-    new LongSumAggFunction
-}
-
-class FloatSumAggFunctionTest extends SumAggFunctionTestBase[Float] {
-
-  override def maxVal = 12345.6789f
-
-  override def aggregator: AggregateFunction[Float, SumAccumulator[Float]] =
-    new FloatSumAggFunction
-}
-
-class DoubleSumAggFunctionTest extends SumAggFunctionTestBase[Double] {
-
-  override def maxVal = 12345.6789d
-
-  override def aggregator: AggregateFunction[Double, SumAccumulator[Double]] =
-    new DoubleSumAggFunction
-}
-
-
-class DecimalSumAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, DecimalSumAccumulator] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("2"),
-      new BigDecimal("3"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-1000"),
-      new BigDecimal("0.000000000002"),
-      new BigDecimal("1000"),
-      new BigDecimal("-0.000000000001"),
-      new BigDecimal("999.999"),
-      null,
-      new BigDecimal("4"),
-      new BigDecimal("-999.999"),
-      null
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("10.000000000001"),
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, DecimalSumAccumulator] =
-    new DecimalSumAggFunction()
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
deleted file mode 100644
index 8b6daba..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in sum with retract aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class SumWithRetractAggFunctionTestBase[T: Numeric]
-  extends AggFunctionTestBase[T, SumWithRetractAccumulator[T]] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def maxVal: T
-
-  private val minVal = numeric.negate(maxVal)
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      numeric.fromInt(2),
-      numeric.fromInt(3),
-      numeric.fromInt(4),
-      numeric.fromInt(5),
-      numeric.fromInt(-10),
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T],
-      maxVal
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    numeric.fromInt(2),
-    null.asInstanceOf[T]
-  )
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class ByteSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Byte] {
-
-  override def maxVal = (Byte.MaxValue / 2).toByte
-
-  override def aggregator: AggregateFunction[Byte, SumWithRetractAccumulator[Byte]] =
-    new ByteSumWithRetractAggFunction
-}
-
-class ShortSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Short] {
-
-  override def maxVal = (Short.MaxValue / 2).toShort
-
-  override def aggregator: AggregateFunction[Short, SumWithRetractAccumulator[Short]] =
-    new ShortSumWithRetractAggFunction
-}
-
-class IntSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Int] {
-
-  override def maxVal = Int.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Int, SumWithRetractAccumulator[Int]] =
-    new IntSumWithRetractAggFunction
-}
-
-class LongSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Long] {
-
-  override def maxVal = Long.MaxValue / 2
-
-  override def aggregator: AggregateFunction[Long, SumWithRetractAccumulator[Long]] =
-    new LongSumWithRetractAggFunction
-}
-
-class FloatSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Float] {
-
-  override def maxVal = 12345.6789f
-
-  override def aggregator: AggregateFunction[Float, SumWithRetractAccumulator[Float]] =
-    new FloatSumWithRetractAggFunction
-}
-
-class DoubleSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Double] {
-
-  override def maxVal = 12345.6789d
-
-  override def aggregator: AggregateFunction[Double, SumWithRetractAccumulator[Double]] =
-    new DoubleSumWithRetractAggFunction
-}
-
-
-class DecimalSumWithRetractAggFunctionTest
-  extends AggFunctionTestBase[BigDecimal, DecimalSumWithRetractAccumulator] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("2"),
-      new BigDecimal("3"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-1000"),
-      new BigDecimal("0.000000000002"),
-      new BigDecimal("1000"),
-      new BigDecimal("-0.000000000001"),
-      new BigDecimal("999.999"),
-      null,
-      new BigDecimal("4"),
-      new BigDecimal("-999.999"),
-      null
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("10.000000000001"),
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, DecimalSumWithRetractAccumulator] =
-    new DecimalSumWithRetractAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
new file mode 100644
index 0000000..a15f1d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Ignore, Test}
+
+class ExpressionReductionRulesTest extends TableTestBase {
+
+  @Test
+  def testReduceCalcExpressionForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01.123 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01.123 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      )
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceCalcExpressionForBatchTableAPI(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchTableAPI(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchTableAPI(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01.123 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01.123 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      )
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamTableAPI(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamTableAPI(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result =  table
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamTableAPI(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testNestedTablesReductionStream(): Unit = {
+    val util = streamTestUtil()
+
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+
+    util.tableEnv.registerTable("NewTable", newTable)
+
+    val sqlQuery = "SELECT a FROM NewTable"
+
+    // 1+1 should be normalized to 2
+    val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testNestedTablesReductionBatch(): Unit = {
+    val util = batchTestUtil()
+
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+
+    util.tableEnv.registerTable("NewTable", newTable)
+
+    val sqlQuery = "SELECT a FROM NewTable"
+
+    // 1+1 should be normalized to 2
+    val expected = unaryNode("DataSetCalc", batchTableNode(0), term("select", "+(2, a) AS a"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  // TODO this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
+  @Ignore
+  def testReduceDeterministicUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    // if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
+    val result = table
+      .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
+      .where("d.isNull")
+      .select('a, 'b, 'c)
+
+    val expected: String = streamTableNode(0)
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceNonDeterministicUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
+      .where("d.isNull")
+      .select('a, 'b, 'c)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
+    )
+
+    util.verifyTable(result, expected)
+  }
+}
+
+object NonDeterministicNullFunc extends ScalarFunction {
+  def eval(): String = null
+  override def isDeterministic = false
+}
+
+object DeterministicNullFunc extends ScalarFunction {
+  def eval(): String = null
+  override def isDeterministic = true
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala
new file mode 100644
index 0000000..3ca9232
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/NormalizationRulesTest.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan
+
+import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule
+import org.apache.calcite.tools.RuleSets
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class NormalizationRulesTest extends TableTestBase {
+
+  @Test
+  def testApplyNormalizationRuleForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+
+    // rewrite distinct aggregate
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+        .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
+        .replaceLogicalOptRuleSet(RuleSets.ofList())
+        .replacePhysicalOptRuleSet(RuleSets.ofList())
+        .build()
+    util.tableEnv.getConfig.setCalciteConfig(cc)
+
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "COUNT(DISTINCT a)" +
+      "FROM MyTable group by b"
+
+    // expect double aggregate
+    val expected = unaryNode("LogicalProject",
+      unaryNode("LogicalAggregate",
+        unaryNode("LogicalAggregate",
+          unaryNode("LogicalProject",
+            values("LogicalTableScan", term("table", "[_DataSetTable_0]")),
+            term("b", "$1"), term("a", "$0")),
+          term("group", "{0, 1}")),
+        term("group", "{0}"), term("EXPR$0", "COUNT($1)")
+      ),
+      term("EXPR$0", "$1")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testApplyNormalizationRuleForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+
+    // rewrite distinct aggregate
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+        .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN))
+        .replaceLogicalOptRuleSet(RuleSets.ofList())
+        .replacePhysicalOptRuleSet(RuleSets.ofList())
+        .build()
+    util.tableEnv.getConfig.setCalciteConfig(cc)
+
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "COUNT(DISTINCT a)" +
+      "FROM MyTable group by b"
+
+    // expect double aggregate
+    val expected = unaryNode(
+      "LogicalProject",
+      unaryNode("LogicalAggregate",
+        unaryNode("LogicalAggregate",
+          unaryNode("LogicalProject",
+            values("LogicalTableScan", term("table", "[_DataStreamTable_0]")),
+            term("b", "$1"), term("a", "$0")),
+          term("group", "{0, 1}")),
+        term("group", "{0}"), term("EXPR$0", "COUNT($1)")
+      ),
+      term("EXPR$0", "$1")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/QueryDecorrelationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/QueryDecorrelationTest.scala
new file mode 100644
index 0000000..0c3796f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/QueryDecorrelationTest.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class QueryDecorrelationTest extends TableTestBase {
+
+  @Test
+  def testCorrelationScalarAggAndFilter(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String, String, Int, Int)]("emp", 'empno, 'ename, 'job, 'salary, 'deptno)
+    util.addTable[(Int, String)]("dept", 'deptno, 'name)
+
+    val sql = "SELECT e1.empno\n" +
+        "FROM emp e1, dept d1 where e1.deptno = d1.deptno\n" +
+        "and e1.deptno < 10 and d1.deptno < 15\n" +
+        "and e1.salary > (select avg(salary) from emp e2 where e1.empno = e2.empno)"
+
+    val expectedQuery = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          binaryNode(
+            "DataSetJoin",
+            unaryNode(
+              "DataSetCalc",
+              batchTableNode(0),
+              term("select", "empno", "salary", "deptno"),
+              term("where", "<(deptno, 10)")
+            ),
+            unaryNode(
+              "DataSetCalc",
+              batchTableNode(1),
+              term("select", "deptno"),
+              term("where", "<(deptno, 15)")
+            ),
+            term("where", "=(deptno, deptno0)"),
+            term("join", "empno", "salary", "deptno", "deptno0"),
+            term("joinType", "InnerJoin")
+          ),
+          term("select", "empno", "salary")
+        ),
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "empno", "salary"),
+            term("where", "IS NOT NULL(empno)")
+          ),
+          term("groupBy", "empno"),
+          term("select", "empno", "AVG(salary) AS EXPR$0")
+        ),
+        term("where", "AND(=(empno, empno0), >(salary, EXPR$0))"),
+        term("join", "empno", "salary", "empno0", "EXPR$0"),
+        term("joinType", "InnerJoin")
+      ),
+      term("select", "empno")
+    )
+
+    util.verifySql(sql, expectedQuery)
+  }
+
+  @Test
+  def testDecorrelateWithMultiAggregate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String, String, Int, Int)]("emp", 'empno, 'ename, 'job, 'salary, 'deptno)
+    util.addTable[(Int, String)]("dept", 'deptno, 'name)
+
+    val sql = "select sum(e1.empno) from emp e1, dept d1 " +
+        "where e1.deptno = d1.deptno " +
+        "and e1.salary > (" +
+        "    select avg(e2.salary) from emp e2 where e2.deptno = d1.deptno" +
+        ")"
+
+    val expectedQuery = unaryNode(
+      "DataSetAggregate",
+      binaryNode(
+        "DataSetUnion",
+        values(
+          "DataSetValues",
+          tuples(List(null)),
+          term("values", "empno")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          binaryNode(
+            "DataSetJoin",
+            unaryNode(
+              "DataSetCalc",
+              binaryNode(
+                "DataSetJoin",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "empno", "salary", "deptno")
+                ),
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(1),
+                  term("select", "deptno")
+                ),
+                term("where", "=(deptno, deptno0)"),
+                term("join", "empno", "salary", "deptno", "deptno0"),
+                term("joinType", "InnerJoin")
+              ),
+              term("select", "empno", "salary", "deptno0")
+            ),
+            unaryNode(
+              "DataSetAggregate",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(0),
+                term("select", "salary", "deptno"),
+                term("where", "IS NOT NULL(deptno)")
+              ),
+              term("groupBy", "deptno"),
+              term("select", "deptno", "AVG(salary) AS EXPR$0")
+            ),
+            term("where", "AND(=(deptno0, deptno), >(salary, EXPR$0))"),
+            term("join", "empno", "salary", "deptno0", "deptno", "EXPR$0"),
+            term("joinType", "InnerJoin")
+          ),
+          term("select", "empno")
+        ),
+        term("union", "empno")
+      ),
+      term("select", "SUM(empno) AS EXPR$0")
+    )
+
+    util.verifySql(sql, expectedQuery)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
new file mode 100644
index 0000000..535bbf5
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.nodes.datastream._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+
+class RetractionRulesTest extends TableTestBase {
+
+  def streamTestForRetractionUtil(): StreamTableTestForRetractionUtil = {
+    new StreamTableTestForRetractionUtil()
+  }
+
+  @Test
+  def testSelect(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number)
+
+    val resultTable = table.select('word, 'number)
+
+    val expected = s"DataStreamScan(false, Acc)"
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // one level unbounded groupBy
+  @Test
+  def testGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .groupBy('word)
+      .select('number.count)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          "DataStreamScan(true, Acc)",
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // two level unbounded groupBy
+  @Test
+  def testTwoGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .groupBy('word)
+      .select('word, 'number.count as 'count)
+      .groupBy('count)
+      .select('count, 'count.count as 'frequency)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupAggregate",
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // group window
+  @Test
+  def testGroupWindow(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number, 'rowtime.rowtime)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .window(Tumble over 50.milli on 'rowtime as 'w)
+      .groupBy('w, 'word)
+      .select('word, 'number.count as 'count)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          "DataStreamScan(true, Acc)",
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // group window after unbounded groupBy
+  @Test
+  @Ignore // cannot pass rowtime through non-windowed aggregation
+  def testGroupWindowAfterGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val table = util.addTable[(String, Int)]('word, 'number, 'rowtime.rowtime)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = table
+      .groupBy('word)
+      .select('word, 'number.count as 'count)
+      .window(Tumble over 50.milli on 'rowtime as 'w)
+      .groupBy('w, 'count)
+      .select('count, 'count.count as 'frequency)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupAggregate",
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+
+  // over window
+  @Test
+  def testOverWindow(): Unit = {
+    val util = streamTestForRetractionUtil()
+    util.addTable[(String, Int)]("T1", 'word, 'number, 'proctime.proctime)
+    val defaultStatus = "false, Acc"
+
+    val sqlQuery =
+      "SELECT " +
+        "word, count(number) " +
+        "OVER (PARTITION BY word ORDER BY proctime " +
+        "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+        "FROM T1"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          "DataStreamScan(true, Acc)",
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifySqlTrait(sqlQuery, expected)
+  }
+
+
+  // over window after unbounded groupBy
+  @Test
+  @Ignore // cannot pass rowtime through non-windowed aggregation
+  def testOverWindowAfterGroupBy(): Unit = {
+    val util = streamTestForRetractionUtil()
+    util.addTable[(String, Int)]("T1", 'word, 'number, 'proctime.proctime)
+    val defaultStatus = "false, Acc"
+
+    val sqlQuery =
+      "SELECT " +
+        "_count, count(word) " +
+        "OVER (PARTITION BY _count ORDER BY proctime " +
+        "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+        "FROM " +
+        "(SELECT word, count(number) as _count FROM T1 GROUP BY word) "
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            unaryNode(
+              "DataStreamGroupAggregate",
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifySqlTrait(sqlQuery, expected)
+  }
+
+  // test binaryNode
+  @Test
+  def testBinaryNode(): Unit = {
+    val util = streamTestForRetractionUtil()
+    val lTable = util.addTable[(String, Int)]('word, 'number)
+    val rTable = util.addTable[(String, Long)]('word_r, 'count_r)
+    val defaultStatus = "false, Acc"
+
+    val resultTable = lTable
+      .groupBy('word)
+      .select('word, 'number.count as 'count)
+      .unionAll(rTable)
+      .groupBy('count)
+      .select('count, 'count.count as 'frequency)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            binaryNode(
+              "DataStreamUnion",
+              unaryNode(
+                "DataStreamCalc",
+                unaryNode(
+                  "DataStreamGroupAggregate",
+                  "DataStreamScan(true, Acc)",
+                  "true, AccRetract"
+                ),
+                "true, AccRetract"
+              ),
+              "DataStreamScan(true, Acc)",
+              "true, AccRetract"
+            ),
+            "true, AccRetract"
+          ),
+          s"$defaultStatus"
+        ),
+        s"$defaultStatus"
+      )
+
+    util.verifyTableTrait(resultTable, expected)
+  }
+}
+
+class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
+
+  def verifySqlTrait(query: String, expected: String): Unit = {
+    verifyTableTrait(tableEnv.sql(query), expected)
+  }
+
+  def verifyTableTrait(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
+    val actual = TraitUtil.toString(optimized)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
+  }
+}
+
+object TraitUtil {
+  def toString(rel: RelNode): String = {
+    val className = rel.getClass.getSimpleName
+    var childString: String = ""
+    var i = 0
+    while (i < rel.getInputs.size()) {
+      childString += TraitUtil.toString(rel.getInput(i))
+      i += 1
+    }
+
+    val retractString = rel.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).toString
+    val accModetString = rel.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).toString
+
+    s"""$className($retractString, $accModetString)
+       |$childString
+       |""".stripMargin.stripLineEnd
+  }
+}
+


Mime
View raw message