flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-5956] [table] Add retract method for AggregateFunction.
Date Tue, 07 Mar 2017 17:56:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 821da81fe -> 53fb8f3b5


[FLINK-5956] [table] Add retract method for AggregateFunction.

This closes #3470.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd801aa5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd801aa5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd801aa5

Branch: refs/heads/master
Commit: cd801aa5c5af0a5f1facd78c8464df9aef95f094
Parents: 821da81
Author: shaoxuan-wang <wshaoxuan@gmail.com>
Authored: Sat Mar 4 18:10:52 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Mar 7 13:07:06 2017 +0100

----------------------------------------------------------------------
 .../table/functions/AggregateFunction.scala     |  14 ++
 .../functions/aggfunctions/AvgAggFunction.scala |  42 +++-
 .../aggfunctions/CountAggFunction.scala         |   6 +
 .../functions/aggfunctions/MaxAggFunction.scala |  13 --
 .../MaxAggFunctionWithRetract.scala             | 209 +++++++++++++++++++
 .../functions/aggfunctions/MinAggFunction.scala |  13 --
 .../MinAggFunctionWithRetract.scala             | 209 +++++++++++++++++++
 .../functions/aggfunctions/SumAggFunction.scala |   2 +-
 .../SumWithRetractAggFunction.scala             | 197 +++++++++++++++++
 .../table/runtime/aggregate/AggregateUtil.scala | 207 ++++++++++++------
 .../aggfunctions/AggFunctionTestBase.scala      |  56 +++--
 .../aggfunctions/MaxAggFunctionTest.scala       |   6 +
 .../MaxWithRetractAggFunctionTest.scala         | 188 +++++++++++++++++
 .../aggfunctions/MinAggFunctionTest.scala       |   8 +-
 .../MinWithRetractAggFunctionTest.scala         | 188 +++++++++++++++++
 .../aggfunctions/SumAggFunctionTest.scala       |   4 +
 .../SumWithRetractAggFunctionTest.scala         | 147 +++++++++++++
 17 files changed, 1399 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
index e5666ce..967d2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.functions
 import java.util.{List => JList}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
 
 /**
   * Base class for User-Defined Aggregates.
@@ -35,6 +36,19 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
   def createAccumulator(): Accumulator
 
   /**
+    * Retract the input values from the accumulator instance. The current design assumes the
+    * inputs are the values that have been previously accumulated.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param input       the input value (usually obtained from a new arrived data)
+    */
+  def retract(accumulator: Accumulator, input: Any): Unit = {
+    throw TableException("Retract is an optional method. There is no default implementation. You " +
+                           "must implement one for yourself.")
+  }
+
+  /**
     * Called every time when an aggregation result should be materialized.
     * The returned value could be either an early and incomplete result
     * (periodically emitted as data arrive) or the final result of the

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
index 534bb03..dad4d7f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
@@ -51,6 +51,15 @@ abstract class IntegralAvgAggFunction[T] extends AggregateFunction[T] {
     }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[Number].longValue()
+      val accum = accumulator.asInstanceOf[IntegralAvgAccumulator]
+      accum.f0 -= v
+      accum.f1 -= 1L
+    }
+  }
+
   override def getValue(accumulator: Accumulator): T = {
     val accum = accumulator.asInstanceOf[IntegralAvgAccumulator]
     if (accum.f1 == 0) {
@@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
     }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[Long]
+      val a = accumulator.asInstanceOf[BigIntegralAvgAccumulator]
+      a.f0 = a.f0.subtract(BigInteger.valueOf(v))
+      a.f1 -= 1L
+    }
+  }
+
   override def getValue(accumulator: Accumulator): T = {
     val a = accumulator.asInstanceOf[BigIntegralAvgAccumulator]
     if (a.f1 == 0) {
@@ -209,6 +227,15 @@ abstract class FloatingAvgAggFunction[T] extends AggregateFunction[T] {
     }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[Number].doubleValue()
+      val accum = accumulator.asInstanceOf[FloatingAvgAccumulator]
+      accum.f0 -= v
+      accum.f1 -= 1L
+    }
+  }
+
   override def getValue(accumulator: Accumulator): T = {
     val accum = accumulator.asInstanceOf[FloatingAvgAccumulator]
     if (accum.f1 == 0) {
@@ -281,15 +308,20 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] {
     if (value != null) {
       val v = value.asInstanceOf[BigDecimal]
       val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
-      if (accum.f1 == 0) {
-        accum.f0 = v
-      } else {
-        accum.f0 = accum.f0.add(v)
-      }
+      accum.f0 = accum.f0.add(v)
       accum.f1 += 1L
     }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[BigDecimal]
+      val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
+      accum.f0 = accum.f0.subtract(v)
+      accum.f1 -= 1L
+    }
+  }
+
   override def getValue(accumulator: Accumulator): BigDecimal = {
     val a = accumulator.asInstanceOf[DecimalAvgAccumulator]
     if (a.f1 == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
index cf884ed..8191a2f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
@@ -40,6 +40,12 @@ class CountAggFunction extends AggregateFunction[Long] {
     }
   }
 
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      accumulator.asInstanceOf[CountAccumulator].f0 -= 1L
+    }
+  }
+
   override def getValue(accumulator: Accumulator): Long = {
     accumulator.asInstanceOf[CountAccumulator].f0
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 33cfd65..1a0a80b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -147,19 +147,6 @@ class BooleanMaxAggFunction extends MaxAggFunction[Boolean] {
   * Built-in Big Decimal Max aggregate function
   */
 class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] {
-
-  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
-    if (value != null) {
-      val v = value.asInstanceOf[BigDecimal]
-      val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]]
-      if (!accum.f1 || accum.f0.compareTo(v) < 0) {
-        accum.f0 = v
-        accum.f1 = true
-      }
-    }
-  }
-
   override def getInitValue = BigDecimal.ZERO
-
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
new file mode 100644
index 0000000..3d83121
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Max with retraction aggregate function */
+class MaxWithRetractAccumulator[T] extends JTuple2[T, JHashMap[T, Long]] with Accumulator
+
+/**
+  * Base class for built-in Max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T])
+  extends AggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+    val acc = new MaxWithRetractAccumulator[T]
+    acc.f0 = getInitValue //max
+    acc.f1 = new JHashMap[T, Long]() //store the count for each value
+    acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[T]
+      val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+
+      if (a.f1.size() == 0 || (ord.compare(a.f0, v) < 0)) {
+        a.f0 = v
+      }
+
+      if (!a.f1.containsKey(v)) {
+        a.f1.put(v, 1L)
+      } else {
+        var count = a.f1.get(v)
+        count += 1L
+        a.f1.put(v, count)
+      }
+    }
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[T]
+      val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+
+      var count = a.f1.get(v)
+      count -= 1L
+      if (count == 0) {
+        //remove the key v from the map if the number of appearance of the value v is 0
+        a.f1.remove(v)
+        //if the total count is 0, we could just simply set the f0(max) to the initial value
+        if (a.f1.size() == 0) {
+          a.f0 = getInitValue
+          return
+        }
+        //if v is the current max value, we have to iterate the map to find the 2nd biggest
+        // value to replace v as the max value
+        if (v == a.f0) {
+          val iterator = a.f1.keySet().iterator()
+          var key = iterator.next()
+          a.f0 = key
+          while (iterator.hasNext()) {
+            key = iterator.next()
+            if (ord.compare(a.f0, key) < 0) {
+              a.f0 = key
+            }
+          }
+        }
+      } else {
+        a.f1.put(v, count)
+      }
+    }
+
+  }
+
+  override def getValue(accumulator: Accumulator): T = {
+    val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
+    if (a.f1.size() != 0) {
+      a.f0
+    } else {
+      null.asInstanceOf[T]
+    }
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+    val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]]
+    var i: Int = 1
+    while (i < accumulators.size()) {
+      val a = accumulators.get(i).asInstanceOf[MaxWithRetractAccumulator[T]]
+      if (a.f1.size() != 0) {
+        // set max element
+        if (ord.compare(ret.f0, a.f0) < 0) {
+          ret.f0 = a.f0
+        }
+        // merge the count for each key
+        val iterator = a.f1.keySet().iterator()
+        while (iterator.hasNext()) {
+          val key = iterator.next()
+          if (ret.f1.containsKey(key)) {
+            ret.f1.put(key, ret.f1.get(key) + a.f1.get(key))
+          } else {
+            ret.f1.put(key, a.f1.get(key))
+          }
+        }
+      }
+      i += 1
+    }
+    ret
+  }
+
+  override def getAccumulatorType(): TypeInformation[_] = {
+    new TupleTypeInfo(
+      new MaxWithRetractAccumulator[T].getClass,
+      getValueTypeInfo,
+      new MapTypeInfo(getValueTypeInfo, BasicTypeInfo.LONG_TYPE_INFO))
+  }
+
+  def getInitValue: T
+
+  def getValueTypeInfo: TypeInformation[_]
+}
+
+/**
+  * Built-in Byte Max with retraction aggregate function
+  */
+class ByteMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Byte] {
+  override def getInitValue: Byte = 0.toByte
+  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
+}
+
+/**
+  * Built-in Short Max with retraction aggregate function
+  */
+class ShortMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Short] {
+  override def getInitValue: Short = 0.toShort
+  override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
+}
+
+/**
+  * Built-in Int Max with retraction aggregate function
+  */
+class IntMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Int] {
+  override def getInitValue: Int = 0
+  override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
+}
+
+/**
+  * Built-in Long Max with retraction aggregate function
+  */
+class LongMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Long] {
+  override def getInitValue: Long = 0L
+  override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+/**
+  * Built-in Float Max with retraction aggregate function
+  */
+class FloatMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Float] {
+  override def getInitValue: Float = 0.0f
+  override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
+}
+
+/**
+  * Built-in Double Max with retraction aggregate function
+  */
+class DoubleMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Double] {
+  override def getInitValue: Double = 0.0d
+  override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
+}
+
+/**
+  * Built-in Boolean Max with retraction aggregate function
+  */
+class BooleanMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Boolean] {
+  override def getInitValue = false
+  override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
+}
+
+/**
+  * Built-in Big Decimal Max with retraction aggregate function
+  */
+class DecimalMaxWithRetractAggFunction extends MaxWithRetractAggFunction[BigDecimal] {
+  override def getInitValue = BigDecimal.ZERO
+  override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index 1b2d6b0..58a3c24 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -147,19 +147,6 @@ class BooleanMinAggFunction extends MinAggFunction[Boolean] {
   * Built-in Big Decimal Min aggregate function
   */
 class DecimalMinAggFunction extends MinAggFunction[BigDecimal] {
-
-  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
-    if (value != null) {
-      val v = value.asInstanceOf[BigDecimal]
-      val accum = accumulator.asInstanceOf[MinAccumulator[BigDecimal]]
-      if (!accum.f1 || accum.f0.compareTo(v) > 0) {
-        accum.f0 = v
-        accum.f1 = true
-      }
-    }
-  }
-
   override def getInitValue: BigDecimal = BigDecimal.ZERO
-
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
new file mode 100644
index 0000000..a08dd25
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Min with retraction aggregate function */
+class MinWithRetractAccumulator[T] extends JTuple2[T, JHashMap[T, Long]] with Accumulator
+
+/**
+  * Base class for built-in Min with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T])
+  extends AggregateFunction[T] {
+
+  override def createAccumulator(): Accumulator = {
+    val acc = new MinWithRetractAccumulator[T]
+    acc.f0 = getInitValue //min
+    acc.f1 = new JHashMap[T, Long]() //store the count for each value
+    acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[T]
+      val a = accumulator.asInstanceOf[MinWithRetractAccumulator[T]]
+
+      if (a.f1.size() == 0 || (ord.compare(a.f0, v) > 0)) {
+        a.f0 = v
+      }
+
+      if (!a.f1.containsKey(v)) {
+        a.f1.put(v, 1L)
+      } else {
+        var count = a.f1.get(v)
+        count += 1L
+        a.f1.put(v, count)
+      }
+    }
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[T]
+      val a = accumulator.asInstanceOf[MinWithRetractAccumulator[T]]
+
+      var count = a.f1.get(v)
+      count -= 1L
+      if (count == 0) {
+        //remove the key v from the map if the number of appearance of the value v is 0
+        a.f1.remove(v)
+        //if the total count is 0, we could just simply set the f0(min) to the initial value
+        if (a.f1.size() == 0) {
+          a.f0 = getInitValue
+          return
+        }
+        //if v is the current min value, we have to iterate the map to find the 2nd smallest
+        // value to replace v as the min value
+        if (v == a.f0) {
+          val iterator = a.f1.keySet().iterator()
+          var key = iterator.next()
+          a.f0 = key
+          while (iterator.hasNext()) {
+            key = iterator.next()
+            if (ord.compare(a.f0, key) > 0) {
+              a.f0 = key
+            }
+          }
+        }
+      } else {
+        a.f1.put(v, count)
+      }
+    }
+
+  }
+
+  override def getValue(accumulator: Accumulator): T = {
+    val a = accumulator.asInstanceOf[MinWithRetractAccumulator[T]]
+    if (a.f1.size() != 0) {
+      a.f0
+    } else {
+      null.asInstanceOf[T]
+    }
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+    val ret = accumulators.get(0).asInstanceOf[MinWithRetractAccumulator[T]]
+    var i: Int = 1
+    while (i < accumulators.size()) {
+      val a = accumulators.get(i).asInstanceOf[MinWithRetractAccumulator[T]]
+      if (a.f1.size() != 0) {
+        // set min element
+        if (ord.compare(ret.f0, a.f0) > 0) {
+          ret.f0 = a.f0
+        }
+        // merge the count for each key
+        val iterator = a.f1.keySet().iterator()
+        while (iterator.hasNext()) {
+          val key = iterator.next()
+          if (ret.f1.containsKey(key)) {
+            ret.f1.put(key, ret.f1.get(key) + a.f1.get(key))
+          } else {
+            ret.f1.put(key, a.f1.get(key))
+          }
+        }
+      }
+      i += 1
+    }
+    ret
+  }
+
+  override def getAccumulatorType(): TypeInformation[_] = {
+    new TupleTypeInfo(
+      new MinWithRetractAccumulator[T].getClass,
+      getValueTypeInfo,
+      new MapTypeInfo(getValueTypeInfo, BasicTypeInfo.LONG_TYPE_INFO))
+  }
+
+  def getInitValue: T
+
+  def getValueTypeInfo: TypeInformation[_]
+}
+
+/**
+  * Built-in Byte Min with retraction aggregate function
+  */
+class ByteMinWithRetractAggFunction extends MinWithRetractAggFunction[Byte] {
+  override def getInitValue: Byte = 0.toByte
+  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
+}
+
+/**
+  * Built-in Short Min with retraction aggregate function
+  */
+class ShortMinWithRetractAggFunction extends MinWithRetractAggFunction[Short] {
+  override def getInitValue: Short = 0.toShort
+  override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
+}
+
+/**
+  * Built-in Int Min with retraction aggregate function
+  */
+class IntMinWithRetractAggFunction extends MinWithRetractAggFunction[Int] {
+  override def getInitValue: Int = 0
+  override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
+}
+
+/**
+  * Built-in Long Min with retraction aggregate function
+  */
+class LongMinWithRetractAggFunction extends MinWithRetractAggFunction[Long] {
+  override def getInitValue: Long = 0L
+  override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+/**
+  * Built-in Float Min with retraction aggregate function
+  */
+class FloatMinWithRetractAggFunction extends MinWithRetractAggFunction[Float] {
+  override def getInitValue: Float = 0.0f
+  override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
+}
+
+/**
+  * Built-in Double Min with retraction aggregate function
+  */
+class DoubleMinWithRetractAggFunction extends MinWithRetractAggFunction[Double] {
+  override def getInitValue: Double = 0.0d
+  override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
+}
+
+/**
+  * Built-in Boolean Min with retraction aggregate function
+  */
+class BooleanMinWithRetractAggFunction extends MinWithRetractAggFunction[Boolean] {
+  override def getInitValue: Boolean = false
+  override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
+}
+
+/**
+  * Built-in Big Decimal Min with retraction aggregate function
+  */
+class DecimalMinWithRetractAggFunction extends MinWithRetractAggFunction[BigDecimal] {
+  override def getInitValue: BigDecimal = BigDecimal.ZERO
+  override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
index 78fdb8e..6c4aba5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
@@ -166,7 +166,7 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] {
     while (i < accumulators.size()) {
       val a = accumulators.get(i).asInstanceOf[DecimalSumAccumulator]
       if (a.f1) {
-        accumulate(ret, a.f0)
+        ret.f0 = ret.f0.add(a.f0)
         ret.f1 = true
       }
       i += 1

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala
new file mode 100644
index 0000000..ebcf184
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/** The initial accumulator for Sum with retract aggregate function */
+class SumWithRetractAccumulator[T] extends JTuple2[T, Long] with Accumulator
+
+/**
+  * Base class for built-in Sum with retract aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumWithRetractAggFunction[T: Numeric] extends AggregateFunction[T] {
+
+  private val numeric = implicitly[Numeric[T]]
+
+  override def createAccumulator(): Accumulator = {
+    val acc = new SumWithRetractAccumulator[T]()
+    acc.f0 = numeric.zero //sum
+    acc.f1 = 0L //total count
+    acc
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[T]
+      val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
+      a.f0 = numeric.plus(a.f0, v)
+      a.f1 += 1
+    }
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[T]
+      val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
+      a.f0 = numeric.minus(a.f0, v)
+      a.f1 -= 1
+    }
+  }
+
+  override def getValue(accumulator: Accumulator): T = {
+    val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
+    if (a.f1 > 0) {
+      a.f0
+    } else {
+      null.asInstanceOf[T]
+    }
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+    val ret = createAccumulator().asInstanceOf[SumWithRetractAccumulator[T]]
+    var i: Int = 0
+    while (i < accumulators.size()) {
+      val a = accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]]
+      ret.f0 = numeric.plus(ret.f0, a.f0)
+      ret.f1 += a.f1
+      i += 1
+    }
+    ret
+  }
+
+  override def getAccumulatorType(): TypeInformation[_] = {
+    new TupleTypeInfo(
+      (new SumWithRetractAccumulator).getClass,
+      getValueTypeInfo,
+      BasicTypeInfo.LONG_TYPE_INFO)
+  }
+
+  def getValueTypeInfo: TypeInformation[_]
+}
+
+/**
+  * Built-in Byte Sum with retract aggregate function
+  */
+class ByteSumWithRetractAggFunction extends SumWithRetractAggFunction[Byte] {
+  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
+}
+
+/**
+  * Built-in Short Sum with retract aggregate function
+  */
+class ShortSumWithRetractAggFunction extends SumWithRetractAggFunction[Short] {
+  override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
+}
+
+/**
+  * Built-in Int Sum with retract aggregate function
+  */
+class IntSumWithRetractAggFunction extends SumWithRetractAggFunction[Int] {
+  override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
+}
+
+/**
+  * Built-in Long Sum with retract aggregate function
+  */
+class LongSumWithRetractAggFunction extends SumWithRetractAggFunction[Long] {
+  override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+/**
+  * Built-in Float Sum with retract aggregate function
+  */
+class FloatSumWithRetractAggFunction extends SumWithRetractAggFunction[Float] {
+  override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
+}
+
+/**
+  * Built-in Double Sum with retract aggregate function
+  */
+class DoubleSumWithRetractAggFunction extends SumWithRetractAggFunction[Double] {
+  override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
+}
+
+/** The initial accumulator for Big Decimal Sum with retract aggregate function */
+class DecimalSumWithRetractAccumulator extends JTuple2[BigDecimal, Long] with Accumulator {
+  f0 = BigDecimal.ZERO
+  f1 = 0L
+}
+
+/**
+  * Built-in Big Decimal Sum with retract aggregate function
+  */
+class DecimalSumWithRetractAggFunction extends AggregateFunction[BigDecimal] {
+
+  override def createAccumulator(): Accumulator = {
+    new DecimalSumWithRetractAccumulator
+  }
+
+  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[BigDecimal]
+      val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator]
+      accum.f0 = accum.f0.add(v)
+      accum.f1 += 1L
+    }
+  }
+
+  override def retract(accumulator: Accumulator, value: Any): Unit = {
+    if (value != null) {
+      val v = value.asInstanceOf[BigDecimal]
+      val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator]
+      accum.f0 = accum.f0.subtract(v)
+      accum.f1 -= 1L
+    }
+  }
+
+  override def getValue(accumulator: Accumulator): BigDecimal = {
+    if (accumulator.asInstanceOf[DecimalSumWithRetractAccumulator].f1 == 0) {
+      null.asInstanceOf[BigDecimal]
+    } else {
+      accumulator.asInstanceOf[DecimalSumWithRetractAccumulator].f0
+    }
+  }
+
+  override def merge(accumulators: JList[Accumulator]): Accumulator = {
+    val ret = accumulators.get(0).asInstanceOf[DecimalSumWithRetractAccumulator]
+    var i: Int = 1
+    while (i < accumulators.size()) {
+      val a = accumulators.get(i).asInstanceOf[DecimalSumWithRetractAccumulator]
+      ret.f0 = ret.f0.add(a.f0)
+      ret.f1 += a.f1
+      i += 1
+    }
+    ret
+  }
+
+  override def getAccumulatorType(): TypeInformation[_] = {
+    new TupleTypeInfo(
+      (new DecimalSumWithRetractAccumulator).getClass,
+      BasicTypeInfo.BIG_DEC_TYPE_INFO,
+      BasicTypeInfo.LONG_TYPE_INFO)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index d549c37..745660d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -78,7 +78,8 @@ object AggregateUtil {
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length)
+      groupings.length,
+      false)
 
     val mapReturnType: RowTypeInfo =
       createDataSetAggregateBufferDataType(groupings, aggregates, inputType)
@@ -125,7 +126,8 @@ object AggregateUtil {
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length)
+      groupings.length,
+      false)
 
     val mapReturnType: RowTypeInfo =
       createDataSetAggregateBufferDataType(
@@ -178,7 +180,8 @@ object AggregateUtil {
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length)._2
+      groupings.length,
+      false)._2
 
     // the mapping relation between field index of intermediate aggregate Row and output Row.
     val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
@@ -324,7 +327,8 @@ object AggregateUtil {
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length)._2
+      groupings.length,
+      false)._2
 
     window match {
       case EventTimeSessionGroupWindow(_, _, gap) =>
@@ -365,7 +369,8 @@ object AggregateUtil {
     val (aggFieldIndex, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length)
+      groupings.length,
+      false)
 
     val (groupingOffsetMapping, aggOffsetMapping) =
       getGroupingOffsetAndAggOffsetMapping(
@@ -458,7 +463,11 @@ object AggregateUtil {
     : (ApiAggregateFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
 
     val (aggFields, aggregates) =
-      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
+      transformToAggregateFunctions(
+        namedAggregates.map(_.getKey),
+        inputType,
+        groupKeysIndex.length,
+        false)
 
     val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
 
@@ -487,7 +496,8 @@ object AggregateUtil {
     val aggregateList = transformToAggregateFunctions(
       aggregateCalls,
       inputType,
-      groupKeysCount)._2
+      groupKeysCount,
+      false)._2
 
     doAllSupportPartialMerge(aggregateList)
   }
@@ -607,7 +617,9 @@ object AggregateUtil {
   private def transformToAggregateFunctions(
       aggregateCalls: Seq[AggregateCall],
       inputType: RelDataType,
-      groupKeysCount: Int): (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = {
+      groupKeysCount: Int,
+      needRetraction: Boolean)
+  : (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = {
 
     // store the aggregate fields of each aggregate function, by the same order of aggregates.
     val aggFieldIndexes = new Array[Int](aggregateCalls.size)
@@ -636,23 +648,44 @@ object AggregateUtil {
       val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
       aggregateCall.getAggregation match {
         case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
-          aggregates(index) = sqlTypeName match {
-            case TINYINT =>
-              new ByteSumAggFunction
-            case SMALLINT =>
-              new ShortSumAggFunction
-            case INTEGER =>
-              new IntSumAggFunction
-            case BIGINT =>
-              new LongSumAggFunction
-            case FLOAT =>
-              new FloatSumAggFunction
-            case DOUBLE =>
-              new DoubleSumAggFunction
-            case DECIMAL =>
-              new DecimalSumAggFunction
-            case sqlType: SqlTypeName =>
-              throw new TableException("Sum aggregate does no support type:" + sqlType)
+          if (needRetraction) {
+            aggregates(index) = sqlTypeName match {
+              case TINYINT =>
+                new ByteSumWithRetractAggFunction
+              case SMALLINT =>
+                new ShortSumWithRetractAggFunction
+              case INTEGER =>
+                new IntSumWithRetractAggFunction
+              case BIGINT =>
+                new LongSumWithRetractAggFunction
+              case FLOAT =>
+                new FloatSumWithRetractAggFunction
+              case DOUBLE =>
+                new DoubleSumWithRetractAggFunction
+              case DECIMAL =>
+                new DecimalSumWithRetractAggFunction
+              case sqlType: SqlTypeName =>
+                throw new TableException("Sum aggregate does no support type:" + sqlType)
+            }
+          } else {
+            aggregates(index) = sqlTypeName match {
+              case TINYINT =>
+                new ByteSumAggFunction
+              case SMALLINT =>
+                new ShortSumAggFunction
+              case INTEGER =>
+                new IntSumAggFunction
+              case BIGINT =>
+                new LongSumAggFunction
+              case FLOAT =>
+                new FloatSumAggFunction
+              case DOUBLE =>
+                new DoubleSumAggFunction
+              case DECIMAL =>
+                new DecimalSumAggFunction
+              case sqlType: SqlTypeName =>
+                throw new TableException("Sum aggregate does no support type:" + sqlType)
+            }
           }
         }
         case _: SqlAvgAggFunction => {
@@ -677,46 +710,94 @@ object AggregateUtil {
         }
         case sqlMinMaxFunction: SqlMinMaxAggFunction => {
           aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
-            sqlTypeName match {
-              case TINYINT =>
-                new ByteMinAggFunction
-              case SMALLINT =>
-                new ShortMinAggFunction
-              case INTEGER =>
-                new IntMinAggFunction
-              case BIGINT =>
-                new LongMinAggFunction
-              case FLOAT =>
-                new FloatMinAggFunction
-              case DOUBLE =>
-                new DoubleMinAggFunction
-              case DECIMAL =>
-                new DecimalMinAggFunction
-              case BOOLEAN =>
-                new BooleanMinAggFunction
-              case sqlType: SqlTypeName =>
-                throw new TableException("Min aggregate does no support type:" + sqlType)
+            if (needRetraction) {
+              sqlTypeName match {
+                case TINYINT =>
+                  new ByteMinWithRetractAggFunction
+                case SMALLINT =>
+                  new ShortMinWithRetractAggFunction
+                case INTEGER =>
+                  new IntMinWithRetractAggFunction
+                case BIGINT =>
+                  new LongMinWithRetractAggFunction
+                case FLOAT =>
+                  new FloatMinWithRetractAggFunction
+                case DOUBLE =>
+                  new DoubleMinWithRetractAggFunction
+                case DECIMAL =>
+                  new DecimalMinWithRetractAggFunction
+                case BOOLEAN =>
+                  new BooleanMinWithRetractAggFunction
+                case sqlType: SqlTypeName =>
+                  throw new TableException("Min with retract aggregate does no support type:" +
+                                             sqlType)
+              }
+            } else {
+              sqlTypeName match {
+                case TINYINT =>
+                  new ByteMinAggFunction
+                case SMALLINT =>
+                  new ShortMinAggFunction
+                case INTEGER =>
+                  new IntMinAggFunction
+                case BIGINT =>
+                  new LongMinAggFunction
+                case FLOAT =>
+                  new FloatMinAggFunction
+                case DOUBLE =>
+                  new DoubleMinAggFunction
+                case DECIMAL =>
+                  new DecimalMinAggFunction
+                case BOOLEAN =>
+                  new BooleanMinAggFunction
+                case sqlType: SqlTypeName =>
+                  throw new TableException("Min aggregate does no support type:" + sqlType)
+              }
             }
           } else {
-            sqlTypeName match {
-              case TINYINT =>
-                new ByteMaxAggFunction
-              case SMALLINT =>
-                new ShortMaxAggFunction
-              case INTEGER =>
-                new IntMaxAggFunction
-              case BIGINT =>
-                new LongMaxAggFunction
-              case FLOAT =>
-                new FloatMaxAggFunction
-              case DOUBLE =>
-                new DoubleMaxAggFunction
-              case DECIMAL =>
-                new DecimalMaxAggFunction
-              case BOOLEAN =>
-                new BooleanMaxAggFunction
-              case sqlType: SqlTypeName =>
-                throw new TableException("Max aggregate does no support type:" + sqlType)
+            if (needRetraction) {
+              sqlTypeName match {
+                case TINYINT =>
+                  new ByteMaxWithRetractAggFunction
+                case SMALLINT =>
+                  new ShortMaxWithRetractAggFunction
+                case INTEGER =>
+                  new IntMaxWithRetractAggFunction
+                case BIGINT =>
+                  new LongMaxWithRetractAggFunction
+                case FLOAT =>
+                  new FloatMaxWithRetractAggFunction
+                case DOUBLE =>
+                  new DoubleMaxWithRetractAggFunction
+                case DECIMAL =>
+                  new DecimalMaxWithRetractAggFunction
+                case BOOLEAN =>
+                  new BooleanMaxWithRetractAggFunction
+                case sqlType: SqlTypeName =>
+                  throw new TableException("Max with retract aggregate does no support type:" +
+                                             sqlType)
+              }
+            } else {
+              sqlTypeName match {
+                case TINYINT =>
+                  new ByteMaxAggFunction
+                case SMALLINT =>
+                  new ShortMaxAggFunction
+                case INTEGER =>
+                  new IntMaxAggFunction
+                case BIGINT =>
+                  new LongMaxAggFunction
+                case FLOAT =>
+                  new FloatMaxAggFunction
+                case DOUBLE =>
+                  new DoubleMaxAggFunction
+                case DECIMAL =>
+                  new DecimalMaxAggFunction
+                case BOOLEAN =>
+                  new BooleanMaxAggFunction
+                case sqlType: SqlTypeName =>
+                  throw new TableException("Max aggregate does no support type:" + sqlType)
+              }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
index 5ba3e34..5c6f7c4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
@@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {
 
   def aggregator: AggregateFunction[T]
 
+  def supportRetraction: Boolean = true
+
   @Test
-  // test aggregate functions without partial merge
-  def testAggregateWithoutMerge(): Unit = {
+  // test aggregate and retract functions without partial merge
+  def testAccumulateAndRetractWithoutMerge(): Unit = {
     // iterate over input sets
     for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-      val accumulator = aggregateVals(vals)
-      val result = aggregator.getValue(accumulator)
-      validateResult(expected, result)
+      val accumulator = accumulateVals(vals)
+      var result = aggregator.getValue(accumulator)
+      validateResult[T](expected, result)
+
+      if (supportRetraction) {
+        retractVals(accumulator, vals)
+        val expectedAccum = aggregator.createAccumulator()
+        //The two accumulators should be exactly same
+        validateResult[Accumulator](expectedAccum, accumulator)
+      }
     }
   }
 
@@ -57,31 +66,46 @@ abstract class AggFunctionTestBase[T] {
         //equally split the vals sequence into two sequences
         val (firstVals, secondVals) = vals.splitAt(vals.length / 2)
 
+        //1. verify merge with accumulate
         val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
-        accumulators.add(aggregateVals(firstVals))
-        accumulators.add(aggregateVals(secondVals))
+        accumulators.add(accumulateVals(firstVals))
+        accumulators.add(accumulateVals(secondVals))
 
-        val accumulator = aggregator.merge(accumulators)
+        var accumulator = aggregator.merge(accumulators)
         val result = aggregator.getValue(accumulator)
-        validateResult(expected, result)
+        validateResult[T](expected, result)
+
+        //2. verify merge with accumulate & retract
+        if (supportRetraction) {
+          retractVals(accumulator, vals)
+          val expectedAccum = aggregator.createAccumulator()
+          //The two accumulators should be exactly same
+          validateResult[Accumulator](expectedAccum, accumulator)
+        }
       }
 
       // iterate over input sets
       for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-        //test partial merge with an empty accumulator
+        //3. test partial merge with an empty accumulator
         val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
-        accumulators.add(aggregateVals(vals))
+        accumulators.add(accumulateVals(vals))
         accumulators.add(aggregator.createAccumulator())
 
         val accumulator = aggregator.merge(accumulators)
         val result = aggregator.getValue(accumulator)
-        validateResult(expected, result)
+        validateResult[T](expected, result)
       }
     }
   }
 
-  private def validateResult(expected: T, result: T): Unit = {
+  private def validateResult[T](expected: T, result: T): Unit = {
     (expected, result) match {
+      case (e: DecimalSumWithRetractAccumulator, r: DecimalSumWithRetractAccumulator) =>
+        // BigDecimal.equals() value and scale but we are only interested in value.
+        assert(e.f0.compareTo(r.f0) == 0 && e.f1 == r.f1)
+      case (e: DecimalAvgAccumulator, r: DecimalAvgAccumulator) =>
+        // BigDecimal.equals() value and scale but we are only interested in value.
+        assert(e.f0.compareTo(r.f0) == 0 && e.f1 == r.f1)
       case (e: BigDecimal, r: BigDecimal) =>
         // BigDecimal.equals() value and scale but we are only interested in value.
         assert(e.compareTo(r) == 0)
@@ -90,9 +114,13 @@ abstract class AggFunctionTestBase[T] {
     }
   }
 
-  private def aggregateVals(vals: Seq[_]): Accumulator = {
+  private def accumulateVals(vals: Seq[_]): Accumulator = {
     val accumulator = aggregator.createAccumulator()
     vals.foreach(v => aggregator.accumulate(accumulator, v))
     accumulator
   }
+
+  private def retractVals(accumulator:Accumulator, vals: Seq[_]) = {
+    vals.foreach(v => aggregator.retract(accumulator, v))
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
index 91cbeea..396be24 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
@@ -61,6 +61,8 @@ abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
     maxVal,
     null.asInstanceOf[T]
   )
+
+  override def supportRetraction: Boolean = false
 }
 
 class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] {
@@ -154,6 +156,8 @@ class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean] {
   )
 
   override def aggregator: AggregateFunction[Boolean] = new BooleanMaxAggFunction()
+
+  override def supportRetraction: Boolean = false
 }
 
 class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
@@ -185,4 +189,6 @@ class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
   )
 
   override def aggregator: AggregateFunction[BigDecimal] = new DecimalMaxAggFunction()
+
+  override def supportRetraction: Boolean = false
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..c2329a4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxWithRetractAggFunctionTest.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+  * Test case for built-in max with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MaxWithRetractAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      maxVal,
+      numeric.fromInt(-99),
+      numeric.fromInt(3),
+      numeric.fromInt(56),
+      numeric.fromInt(0),
+      minVal,
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    maxVal,
+    null.asInstanceOf[T]
+  )
+}
+
+class ByteMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Byte] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator: AggregateFunction[Byte] = new ByteMaxWithRetractAggFunction()
+}
+
+class ShortMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Short] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator: AggregateFunction[Short] = new ShortMaxWithRetractAggFunction()
+}
+
+class IntMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Int] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Int] = new IntMaxWithRetractAggFunction()
+}
+
+class LongMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Long] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Long] = new LongMaxWithRetractAggFunction()
+}
+
+class FloatMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Float] {
+
+  override def minVal = Float.MinValue / 2
+
+  override def maxVal = Float.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Float] = new FloatMaxWithRetractAggFunction()
+}
+
+class DoubleMaxWithRetractAggFunctionTest extends MaxWithRetractAggFunctionTest[Double] {
+
+  override def minVal = Double.MinValue / 2
+
+  override def maxVal = Double.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Double] = new DoubleMaxWithRetractAggFunction()
+}
+
+class BooleanMaxWithRetractAggFunctionTest extends AggFunctionTestBase[Boolean] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    true,
+    null.asInstanceOf[Boolean]
+  )
+
+  override def aggregator: AggregateFunction[Boolean] = new BooleanMaxWithRetractAggFunction()
+}
+
+class DecimalMaxWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("1000.000001"),
+      new BigDecimal("-1"),
+      new BigDecimal("-999.998999"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-999.999"),
+      null,
+      new BigDecimal("999.999")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("1000.000001"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal] = new DecimalMaxWithRetractAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
index 6a6e5b9..7d9e52b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
@@ -21,7 +21,7 @@ import java.math.BigDecimal
 import org.apache.flink.table.functions.AggregateFunction
 
 /**
-  * Test case for built-in max aggregate function
+  * Test case for built-in min aggregate function
   *
   * @tparam T the type for the aggregation result
   */
@@ -61,6 +61,8 @@ abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
     minVal,
     null.asInstanceOf[T]
   )
+
+  override def supportRetraction: Boolean = false
 }
 
 class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] {
@@ -154,6 +156,8 @@ class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean] {
   )
 
   override def aggregator: AggregateFunction[Boolean] = new BooleanMinAggFunction()
+
+  override def supportRetraction: Boolean = false
 }
 
 class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
@@ -185,4 +189,6 @@ class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
   )
 
   override def aggregator: AggregateFunction[BigDecimal] = new DecimalMinAggFunction()
+
+  override def supportRetraction: Boolean = false
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..2ef6b67
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinWithRetractAggFunctionTest.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+  * Test case for built-in Min with retraction aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class MinWithRetractAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def minVal: T
+
+  def maxVal: T
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      maxVal,
+      numeric.fromInt(-99),
+      numeric.fromInt(3),
+      numeric.fromInt(56),
+      numeric.fromInt(0),
+      minVal,
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    minVal,
+    null.asInstanceOf[T]
+  )
+}
+
+class ByteMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Byte] {
+
+  override def minVal = (Byte.MinValue + 1).toByte
+
+  override def maxVal = (Byte.MaxValue - 1).toByte
+
+  override def aggregator: AggregateFunction[Byte] = new ByteMinWithRetractAggFunction()
+}
+
+class ShortMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Short] {
+
+  override def minVal = (Short.MinValue + 1).toShort
+
+  override def maxVal = (Short.MaxValue - 1).toShort
+
+  override def aggregator: AggregateFunction[Short] = new ShortMinWithRetractAggFunction()
+}
+
+class IntMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Int] {
+
+  override def minVal = Int.MinValue + 1
+
+  override def maxVal = Int.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Int] = new IntMinWithRetractAggFunction()
+}
+
+class LongMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Long] {
+
+  override def minVal = Long.MinValue + 1
+
+  override def maxVal = Long.MaxValue - 1
+
+  override def aggregator: AggregateFunction[Long] = new LongMinWithRetractAggFunction()
+}
+
+class FloatMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Float] {
+
+  override def minVal = Float.MinValue / 2
+
+  override def maxVal = Float.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Float] = new FloatMinWithRetractAggFunction()
+}
+
+class DoubleMinWithRetractAggFunctionTest extends MinWithRetractAggFunctionTest[Double] {
+
+  override def minVal = Double.MinValue / 2
+
+  override def maxVal = Double.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Double] = new DoubleMinWithRetractAggFunction()
+}
+
+class BooleanMinWithRetractAggFunctionTest extends AggFunctionTestBase[Boolean] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    false,
+    null.asInstanceOf[Boolean]
+  )
+
+  override def aggregator: AggregateFunction[Boolean] = new BooleanMinWithRetractAggFunction()
+}
+
+class DecimalMinWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("1000"),
+      new BigDecimal("-1"),
+      new BigDecimal("-999.998999"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-999.999"),
+      null,
+      new BigDecimal("999.999")
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("-999.999"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal] = new DecimalMinWithRetractAggFunction()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
index 95feddd..cd69187 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
@@ -63,6 +63,8 @@ abstract class SumAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T]
     numeric.fromInt(2),
     null.asInstanceOf[T]
   )
+
+  override def supportRetraction: Boolean = false
 }
 
 class ByteSumAggFunctionTest extends SumAggFunctionTestBase[Byte] {
@@ -142,6 +144,8 @@ class DecimalSumAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
   )
 
   override def aggregator: AggregateFunction[BigDecimal] = new DecimalSumAggFunction()
+
+  override def supportRetraction: Boolean = false
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cd801aa5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
new file mode 100644
index 0000000..72af358
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunctionTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+  * Test case for built-in sum with retract aggregate function
+  *
+  * @tparam T the type for the aggregation result
+  */
+abstract class SumWithRetractAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] {
+
+  private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+  def maxVal: T
+
+  private val minVal = numeric.negate(maxVal)
+
+  override def inputValueSets: Seq[Seq[T]] = Seq(
+    Seq(
+      minVal,
+      numeric.fromInt(1),
+      null.asInstanceOf[T],
+      numeric.fromInt(2),
+      numeric.fromInt(3),
+      numeric.fromInt(4),
+      numeric.fromInt(5),
+      numeric.fromInt(-10),
+      numeric.fromInt(-20),
+      numeric.fromInt(17),
+      null.asInstanceOf[T],
+      maxVal
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
+    )
+  )
+
+  override def expectedResults: Seq[T] = Seq(
+    numeric.fromInt(2),
+    null.asInstanceOf[T]
+  )
+}
+
+class ByteSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Byte] {
+
+  override def maxVal = (Byte.MaxValue / 2).toByte
+
+  override def aggregator: AggregateFunction[Byte] = new ByteSumWithRetractAggFunction
+}
+
+class ShortSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Short] {
+
+  override def maxVal = (Short.MaxValue / 2).toShort
+
+  override def aggregator: AggregateFunction[Short] = new ShortSumWithRetractAggFunction
+}
+
+class IntSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Int] {
+
+  override def maxVal = Int.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Int] = new IntSumWithRetractAggFunction
+}
+
+class LongSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Long] {
+
+  override def maxVal = Long.MaxValue / 2
+
+  override def aggregator: AggregateFunction[Long] = new LongSumWithRetractAggFunction
+}
+
+class FloatSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Float] {
+
+  override def maxVal = 12345.6789f
+
+  override def aggregator: AggregateFunction[Float] = new FloatSumWithRetractAggFunction
+}
+
+class DoubleSumWithRetractAggFunctionTest extends SumWithRetractAggFunctionTestBase[Double] {
+
+  override def maxVal = 12345.6789d
+
+  override def aggregator: AggregateFunction[Double] = new DoubleSumWithRetractAggFunction
+}
+
+
+class DecimalSumWithRetractAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new BigDecimal("1"),
+      new BigDecimal("2"),
+      new BigDecimal("3"),
+      null,
+      new BigDecimal("0"),
+      new BigDecimal("-1000"),
+      new BigDecimal("0.000000000002"),
+      new BigDecimal("1000"),
+      new BigDecimal("-0.000000000001"),
+      new BigDecimal("999.999"),
+      null,
+      new BigDecimal("4"),
+      new BigDecimal("-999.999"),
+      null
+    ),
+    Seq(
+      null,
+      null,
+      null,
+      null,
+      null
+    )
+  )
+
+  override def expectedResults: Seq[BigDecimal] = Seq(
+    new BigDecimal("10.000000000001"),
+    null
+  )
+
+  override def aggregator: AggregateFunction[BigDecimal] = new DecimalSumWithRetractAggFunction()
+}
+
+


Mime
View raw message