flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] flink git commit: [FLINK-5927] [table] Remove old Aggregate interface, built-in functions, and tests.
Date Fri, 03 Mar 2017 13:27:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master 050f9a416 -> c31f95cab


[FLINK-5927] [table] Remove old Aggregate interface, built-in functions, and tests.

This closes #3465.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c31f95ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c31f95ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c31f95ca

Branch: refs/heads/master
Commit: c31f95cab884452dba47306c2b7fb536f047b8ae
Parents: 14fab4c
Author: shaoxuan-wang <wshaoxuan@gmail.com>
Authored: Fri Mar 3 15:05:00 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Mar 3 14:27:08 2017 +0100

----------------------------------------------------------------------
 .../table/runtime/aggregate/Aggregate.scala     |  96 ------
 .../table/runtime/aggregate/AvgAggregate.scala  | 296 -------------------
 .../runtime/aggregate/CountAggregate.scala      |  55 ----
 .../table/runtime/aggregate/MaxAggregate.scala  | 171 -----------
 .../table/runtime/aggregate/MinAggregate.scala  | 171 -----------
 .../table/runtime/aggregate/SumAggregate.scala  | 131 --------
 .../runtime/aggregate/AggregateTestBase.scala   | 111 -------
 .../runtime/aggregate/AvgAggregateTest.scala    | 154 ----------
 .../runtime/aggregate/CountAggregateTest.scala  |  31 --
 .../runtime/aggregate/MaxAggregateTest.scala    | 177 -----------
 .../runtime/aggregate/MinAggregateTest.scala    | 177 -----------
 .../runtime/aggregate/SumAggregateTest.scala    | 137 ---------
 12 files changed, 1707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala
deleted file mode 100644
index a614783..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-
-/**
- * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
- * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
- * -- In Map phase, use prepare() to transform aggregate field value into intermediate
- * aggregate value.
- * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
- * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
- * For associative decomposable aggregate functions, they support partial aggregate. To optimize
- * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
- * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
- * into aggregate buffer.
- *
- * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
- * field index in Row, so different aggregate functions could share the same Row as intermediate
- * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
- * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
- * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
- *
- * @tparam T Aggregated value type.
- */
-trait Aggregate[T] extends Serializable {
-
-  /**
-    * Transform the aggregate field value into intermediate aggregate data.
-    *
-    * @param value The value to insert into the intermediate aggregate row.
-    * @param intermediate The intermediate aggregate row into which the value is inserted.
-    */
-  def prepare(value: Any, intermediate: Row): Unit
-
-  /**
-    * Initiate the intermediate aggregate value in Row.
-    *
-    * @param intermediate The intermediate aggregate row to initiate.
-    */
-  def initiate(intermediate: Row): Unit
-
-  /**
-    * Merge intermediate aggregate data into aggregate buffer.
-    *
-    * @param intermediate The intermediate aggregate row to merge.
-    * @param buffer The aggregate buffer into which the intermedidate is merged.
-    */
-  def merge(intermediate: Row, buffer: Row): Unit
-
-  /**
-    * Calculate the final aggregated result based on aggregate buffer.
-    *
-    * @param buffer The aggregate buffer from which the final aggregate is computed.
-    * @return The final result of the aggregate.
-    */
-  def evaluate(buffer: Row): T
-
-  /**
-    * Intermediate aggregate value types.
-    *
-    * @return The types of the intermediate fields of this aggregate.
-    */
-  def intermediateDataType: Array[TypeInformation[_]]
-
-  /**
-    * Set the aggregate data offset in Row.
-    *
-    * @param aggOffset The offset of this aggregate in the intermediate aggregate rows.
-    */
-  def setAggOffsetInRow(aggOffset: Int)
-
-  /**
-    * Whether aggregate function support partial aggregate.
-    *
-    * @return True if the aggregate supports partial aggregation, False otherwise.
-    */
-  def supportPartial: Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala
deleted file mode 100644
index cb94ca1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import com.google.common.math.LongMath
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-import java.math.BigDecimal
-import java.math.BigInteger
-
-abstract class AvgAggregate[T] extends Aggregate[T] {
-  protected var partialSumIndex: Int = _
-  protected var partialCountIndex: Int = _
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    partialSumIndex = aggOffset
-    partialCountIndex = aggOffset + 1
-  }
-}
-
-abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, 0L)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      partial.setField(partialSumIndex, 0L)
-      partial.setField(partialCountIndex, 0L)
-    } else {
-      doPrepare(value, partial)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum))
-    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
-  }
-
-  override def evaluate(buffer : Row): T = {
-    doEvaluate(buffer).asInstanceOf[T]
-  }
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  def doPrepare(value: Any, partial: Row): Unit
-
-  def doEvaluate(buffer: Row): Any
-}
-
-class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Byte]
-    partial.setField(partialSumIndex, input.toLong)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toByte
-    }
-  }
-}
-
-class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Short]
-    partial.setField(partialSumIndex, input.toLong)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toShort
-    }
-  }
-}
-
-class IntAvgAggregate extends IntegralAvgAggregate[Int] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Int]
-    partial.setField(partialSumIndex, input.toLong)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toInt
-    }
-  }
-}
-
-class LongAvgAggregate extends IntegralAvgAggregate[Long] {
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.BIG_INT_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, BigInteger.ZERO)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      partial.setField(partialSumIndex, BigInteger.ZERO)
-      partial.setField(partialCountIndex, 0L)
-    } else {
-      doPrepare(value, partial)
-    }
-  }
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Long]
-    partial.setField(partialSumIndex, BigInteger.valueOf(input))
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    buffer.setField(partialSumIndex, partialSum.add(bufferSum))
-    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
-    }
-  }
-}
-
-abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, 0D)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      partial.setField(partialSumIndex, 0D)
-      partial.setField(partialCountIndex, 0L)
-    } else {
-      doPrepare(value, partial)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-
-    buffer.setField(partialSumIndex, partialSum + bufferSum)
-    buffer.setField(partialCountIndex, partialCount + bufferCount)
-  }
-
-  override def evaluate(buffer : Row): T = {
-    doEvaluate(buffer).asInstanceOf[T]
-  }
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.DOUBLE_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  def doPrepare(value: Any, partial: Row): Unit
-
-  def doEvaluate(buffer: Row): Any
-}
-
-class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Float]
-    partial.setField(partialSumIndex, input.toDouble)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toFloat
-    }
-  }
-}
-
-class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Double]
-    partial.setField(partialSumIndex, input)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount)
-    }
-  }
-}
-
-class DecimalAvgAggregate extends AvgAggregate[BigDecimal] {
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.BIG_DEC_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, BigDecimal.ZERO)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      val input = value.asInstanceOf[BigDecimal]
-      partial.setField(partialSumIndex, input)
-      partial.setField(partialCountIndex, 1L)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    buffer.setField(partialSumIndex, partialSum.add(bufferSum))
-    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount != 0) {
-      val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
-      bufferSum.divide(BigDecimal.valueOf(bufferCount))
-    } else {
-      null.asInstanceOf[BigDecimal]
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala
deleted file mode 100644
index ea8e1d8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.types.Row
-
-class CountAggregate extends Aggregate[Long] {
-  private var countIndex: Int = _
-
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(countIndex, 0L)
-  }
-
-  override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
-    buffer.setField(countIndex, partialCount + bufferCount)
-  }
-
-  override def evaluate(buffer: Row): Long = {
-    buffer.getField(countIndex).asInstanceOf[Long]
-  }
-
-  override def prepare(value: Any, intermediate: Row): Unit = {
-    if (value == null) {
-      intermediate.setField(countIndex, 0L)
-    } else {
-      intermediate.setField(countIndex, 1L)
-    }
-  }
-
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggIndex: Int): Unit = {
-    countIndex = aggIndex
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala
deleted file mode 100644
index 34b25e0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
-
-  protected var maxIndex = -1
-
-  /**
-   * Initiate the intermediate aggregate value in Row.
-   *
-   * @param intermediate The intermediate aggregate row to initiate.
-   */
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, null)
-  }
-
-  /**
-   * Accessed in MapFunction, prepare the input of partial aggregate.
-   *
-   * @param value
-   * @param intermediate
-   */
-  override def prepare(value: Any, intermediate: Row): Unit = {
-    if (value == null) {
-      initiate(intermediate)
-    } else {
-      intermediate.setField(maxIndex, value)
-    }
-  }
-
-  /**
-   * Accessed in CombineFunction and GroupReduceFunction, merge partial
-   * aggregate result into aggregate buffer.
-   *
-   * @param intermediate
-   * @param buffer
-   */
-  override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
-      if (bufferValue != null) {
-        val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
-        buffer.setField(maxIndex, max)
-      } else {
-        buffer.setField(maxIndex, partialValue)
-      }
-    }
-  }
-
-  /**
-   * Return the final aggregated result based on aggregate buffer.
-   *
-   * @param buffer
-   * @return
-   */
-  override def evaluate(buffer: Row): T = {
-    buffer.getField(maxIndex).asInstanceOf[T]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    maxIndex = aggOffset
-  }
-}
-
-class ByteMaxAggregate extends MaxAggregate[Byte] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-
-}
-
-class ShortMaxAggregate extends MaxAggregate[Short] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-
-}
-
-class IntMaxAggregate extends MaxAggregate[Int] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-
-}
-
-class LongMaxAggregate extends MaxAggregate[Long] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-}
-
-class FloatMaxAggregate extends MaxAggregate[Float] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-
-}
-
-class DoubleMaxAggregate extends MaxAggregate[Double] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-
-}
-
-class BooleanMaxAggregate extends MaxAggregate[Boolean] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
-
-}
-
-class DecimalMaxAggregate extends Aggregate[BigDecimal] {
-
-  protected var minIndex: Int = _
-
-  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, null)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      partial.setField(minIndex, value)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
-      if (bufferValue != null) {
-        val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue
-        buffer.setField(minIndex, min)
-      } else {
-        buffer.setField(minIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    buffer.getField(minIndex).asInstanceOf[BigDecimal]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    minIndex = aggOffset
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
deleted file mode 100644
index 88cb058..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
-
-  protected var minIndex: Int = _
-
-  /**
-   * Initiate the intermediate aggregate value in Row.
-   *
-   * @param intermediate The intermediate aggregate row to initiate.
-   */
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, null)
-  }
-
-  /**
-   * Accessed in MapFunction, prepare the input of partial aggregate.
-   *
-   * @param value
-   * @param partial
-   */
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      partial.setField(minIndex, value)
-    }
-  }
-
-  /**
-   * Accessed in CombineFunction and GroupReduceFunction, merge partial
-   * aggregate result into aggregate buffer.
-   *
-   * @param partial
-   * @param buffer
-   */
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.getField(minIndex).asInstanceOf[T]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
-      if (bufferValue != null) {
-        val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
-        buffer.setField(minIndex, min)
-      } else {
-        buffer.setField(minIndex, partialValue)
-      }
-    }
-  }
-
-  /**
-   * Return the final aggregated result based on aggregate buffer.
-   *
-   * @param buffer
-   * @return
-   */
-  override def evaluate(buffer: Row): T = {
-    buffer.getField(minIndex).asInstanceOf[T]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    minIndex = aggOffset
-  }
-}
-
-class ByteMinAggregate extends MinAggregate[Byte] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-
-}
-
-class ShortMinAggregate extends MinAggregate[Short] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-
-}
-
-class IntMinAggregate extends MinAggregate[Int] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-
-}
-
-class LongMinAggregate extends MinAggregate[Long] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-}
-
-class FloatMinAggregate extends MinAggregate[Float] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-
-}
-
-class DoubleMinAggregate extends MinAggregate[Double] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-
-}
-
-class BooleanMinAggregate extends MinAggregate[Boolean] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
-
-}
-
-class DecimalMinAggregate extends Aggregate[BigDecimal] {
-
-  protected var minIndex: Int = _
-
-  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, null)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      partial.setField(minIndex, value)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
-      if (bufferValue != null) {
-        val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
-        buffer.setField(minIndex, min)
-      } else {
-        buffer.setField(minIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    buffer.getField(minIndex).asInstanceOf[BigDecimal]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    minIndex = aggOffset
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
deleted file mode 100644
index cd88112..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class SumAggregate[T: Numeric]
-  extends Aggregate[T] {
-
-  private val numeric = implicitly[Numeric[T]]
-  protected var sumIndex: Int = _
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(sumIndex, null)
-  }
-
-  override def merge(partial1: Row, buffer: Row): Unit = {
-    val partialValue = partial1.getField(sumIndex).asInstanceOf[T]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(sumIndex).asInstanceOf[T]
-      if (bufferValue != null) {
-        buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
-      } else {
-        buffer.setField(sumIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): T = {
-    buffer.getField(sumIndex).asInstanceOf[T]
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      val input = value.asInstanceOf[T]
-      partial.setField(sumIndex, input)
-    }
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    sumIndex = aggOffset
-  }
-}
-
-class ByteSumAggregate extends SumAggregate[Byte] {
-  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-}
-
-class ShortSumAggregate extends SumAggregate[Short] {
-  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-}
-
-class IntSumAggregate extends SumAggregate[Int] {
-  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-}
-
-class LongSumAggregate extends SumAggregate[Long] {
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-}
-
-class FloatSumAggregate extends SumAggregate[Float] {
-  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-}
-
-class DoubleSumAggregate extends SumAggregate[Double] {
-  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-}
-
-class DecimalSumAggregate extends Aggregate[BigDecimal] {
-
-  protected var sumIndex: Int = _
-
-  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(sumIndex, null)
-  }
-
-  override def merge(partial1: Row, buffer: Row): Unit = {
-    val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal]
-      if (bufferValue != null) {
-        buffer.setField(sumIndex, partialValue.add(bufferValue))
-      } else {
-        buffer.setField(sumIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    buffer.getField(sumIndex).asInstanceOf[BigDecimal]
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      val input = value.asInstanceOf[BigDecimal]
-      partial.setField(sumIndex, input)
-    }
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    sumIndex = aggOffset
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala
deleted file mode 100644
index 0ca101d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AggregateTestBase.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-import org.apache.flink.types.Row
-
-import org.junit.Test
-import org.junit.Assert.assertEquals
-
-abstract class AggregateTestBase[T] {
-
-  private val offset = 2
-  private val rowArity: Int = offset + aggregator.intermediateDataType.length
-
-  def inputValueSets: Seq[Seq[_]]
-
-  def expectedResults: Seq[T]
-
-  def aggregator: Aggregate[T]
-
-  private def createAggregator(): Aggregate[T] = {
-    val agg = aggregator
-    agg.setAggOffsetInRow(offset)
-    agg
-  }
-
-  private def createRow(): Row = {
-    new Row(rowArity)
-  }
-
-  @Test
-  def testAggregate(): Unit = {
-
-    // iterate over input sets
-    for((vals, expected) <- inputValueSets.zip(expectedResults)) {
-
-      // prepare mapper
-      val rows: Seq[Row] = prepare(vals)
-
-      val result = if (aggregator.supportPartial) {
-        // test with combiner
-        val (firstVals, secondVals) = rows.splitAt(rows.length / 2)
-        val combined = partialAgg(firstVals) :: partialAgg(secondVals) :: Nil
-        finalAgg(combined)
-
-      } else {
-        // test without combiner
-        finalAgg(rows)
-      }
-
-      (expected, result) match {
-        case (e: BigDecimal, r: BigDecimal) =>
-          // BigDecimal.equals() value and scale but we are only interested in value.
-          assert(e.compareTo(r) == 0)
-        case _ =>
-          assertEquals(expected, result)
-      }
-    }
-  }
-
-  private def prepare(vals: Seq[_]): Seq[Row] = {
-
-    val agg = createAggregator()
-
-    vals.map { v =>
-      val row = createRow()
-      agg.prepare(v, row)
-      row
-    }
-  }
-
-  private def partialAgg(rows: Seq[Row]): Row = {
-
-    val agg = createAggregator()
-    val aggBuf = createRow()
-
-    agg.initiate(aggBuf)
-    rows.foreach(v => agg.merge(v, aggBuf))
-
-    aggBuf
-  }
-
-  private def finalAgg(rows: Seq[Row]): T = {
-
-    val agg = createAggregator()
-    val aggBuf = createRow()
-
-    agg.initiate(aggBuf)
-    rows.foreach(v => agg.merge(v, aggBuf))
-
-    agg.evaluate(partialAgg(rows))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala
deleted file mode 100644
index a72d08b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/AvgAggregateTest.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class AvgAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      minVal,
-      null.asInstanceOf[T],
-      minVal,
-      minVal,
-      null.asInstanceOf[T],
-      minVal,
-      minVal,
-      minVal
-    ),
-    Seq(
-      maxVal,
-      maxVal,
-      null.asInstanceOf[T],
-      maxVal,
-      maxVal,
-      null.asInstanceOf[T],
-      maxVal,
-      maxVal,
-      maxVal
-    ),
-    Seq(
-      minVal,
-      maxVal,
-      null.asInstanceOf[T],
-      numeric.fromInt(0),
-      numeric.negate(maxVal),
-      numeric.negate(minVal),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    minVal,
-    maxVal,
-    numeric.fromInt(0),
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteAvgAggregateTest extends AvgAggregateTestBase[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator = new ByteAvgAggregate()
-}
-
-class ShortAvgAggregateTest extends AvgAggregateTestBase[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator = new ShortAvgAggregate()
-}
-
-class IntAvgAggregateTest extends AvgAggregateTestBase[Int] {
-
-  override def minVal = Int.MinValue + 1
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator = new IntAvgAggregate()
-}
-
-class LongAvgAggregateTest extends AvgAggregateTestBase[Long] {
-
-  override def minVal = Long.MinValue + 1
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator = new LongAvgAggregate()
-}
-
-class FloatAvgAggregateTest extends AvgAggregateTestBase[Float] {
-
-  override def minVal = Float.MinValue
-  override def maxVal = Float.MaxValue
-
-  override def aggregator = new FloatAvgAggregate()
-}
-
-class DoubleAvgAggregateTest extends AvgAggregateTestBase[Double] {
-
-  override def minVal = Float.MinValue
-  override def maxVal = Float.MaxValue
-
-  override def aggregator = new DoubleAvgAggregate()
-}
-
-class DecimalAvgAggregateTest extends AggregateTestBase[BigDecimal] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("987654321000000"),
-      new BigDecimal("-0.000000000012345"),
-      null,
-      new BigDecimal("0.000000000012345"),
-      new BigDecimal("-987654321000000"),
-      null,
-      new BigDecimal("0")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    BigDecimal.ZERO,
-    null
-  )
-
-  override def aggregator: Aggregate[BigDecimal] = new DecimalAvgAggregate()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala
deleted file mode 100644
index 55f73b4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/CountAggregateTest.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.aggregate
-
-class CountAggregateTest extends AggregateTestBase[Long] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq("a", "b", null, "c", null, "d", "e", null, "f"),
-    Seq(null, null, null, null, null, null)
-  )
-
-  override def expectedResults: Seq[Long] = Seq(6L, 0L)
-
-  override def aggregator: Aggregate[Long] = new CountAggregate()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala
deleted file mode 100644
index 1bf879d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MaxAggregateTest.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class MaxAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      maxVal,
-      numeric.fromInt(-99),
-      numeric.fromInt(3),
-      numeric.fromInt(56),
-      numeric.fromInt(0),
-      minVal,
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    maxVal,
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteMaxAggregateTest extends MaxAggregateTestBase[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator: Aggregate[Byte] = new ByteMaxAggregate()
-}
-
-class ShortMaxAggregateTest extends MaxAggregateTestBase[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator: Aggregate[Short] = new ShortMaxAggregate()
-}
-
-class IntMaxAggregateTest extends MaxAggregateTestBase[Int] {
-
-  override def minVal = Int.MinValue + 1
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator: Aggregate[Int] = new IntMaxAggregate()
-}
-
-class LongMaxAggregateTest extends MaxAggregateTestBase[Long] {
-
-  override def minVal = Long.MinValue + 1
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator: Aggregate[Long] = new LongMaxAggregate()
-}
-
-class FloatMaxAggregateTest extends MaxAggregateTestBase[Float] {
-
-  override def minVal = Float.MinValue / 2
-  override def maxVal = Float.MaxValue / 2
-
-  override def aggregator: Aggregate[Float] = new FloatMaxAggregate()
-}
-
-class DoubleMaxAggregateTest extends MaxAggregateTestBase[Double] {
-
-  override def minVal = Double.MinValue / 2
-  override def maxVal = Double.MaxValue / 2
-
-  override def aggregator: Aggregate[Double] = new DoubleMaxAggregate()
-}
-
-class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] {
-
-  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
-    Seq(
-      false,
-      false,
-      false
-    ),
-    Seq(
-      true,
-      true,
-      true
-    ),
-    Seq(
-      true,
-      false,
-      null.asInstanceOf[Boolean],
-      true,
-      false,
-      true,
-      null.asInstanceOf[Boolean]
-    ),
-    Seq(
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean]
-    )
-  )
-
-  override def expectedResults: Seq[Boolean] = Seq(
-    false,
-    true,
-    true,
-    null.asInstanceOf[Boolean]
-  )
-
-  override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate()
-}
-
-class DecimalMaxAggregateTest extends AggregateTestBase[BigDecimal] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("1000.000001"),
-      new BigDecimal("-1"),
-      new BigDecimal("-999.998999"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-999.999"),
-      null,
-      new BigDecimal("999.999")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("1000.000001"),
-    null
-  )
-
-  override def aggregator: Aggregate[BigDecimal] = new DecimalMaxAggregate()
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala
deleted file mode 100644
index 3e2404d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/MinAggregateTest.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class MinAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      maxVal,
-      numeric.fromInt(-99),
-      numeric.fromInt(3),
-      numeric.fromInt(56),
-      numeric.fromInt(0),
-      minVal,
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    minVal,
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteMinAggregateTest extends MinAggregateTestBase[Byte] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator: Aggregate[Byte] = new ByteMinAggregate()
-}
-
-class ShortMinAggregateTest extends MinAggregateTestBase[Short] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator: Aggregate[Short] = new ShortMinAggregate()
-}
-
-class IntMinAggregateTest extends MinAggregateTestBase[Int] {
-
-  override def minVal = Int.MinValue + 1
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator: Aggregate[Int] = new IntMinAggregate()
-}
-
-class LongMinAggregateTest extends MinAggregateTestBase[Long] {
-
-  override def minVal = Long.MinValue + 1
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator: Aggregate[Long] = new LongMinAggregate()
-}
-
-class FloatMinAggregateTest extends MinAggregateTestBase[Float] {
-
-  override def minVal = Float.MinValue / 2
-  override def maxVal = Float.MaxValue / 2
-
-  override def aggregator: Aggregate[Float] = new FloatMinAggregate()
-}
-
-class DoubleMinAggregateTest extends MinAggregateTestBase[Double] {
-
-  override def minVal = Double.MinValue / 2
-  override def maxVal = Double.MaxValue / 2
-
-  override def aggregator: Aggregate[Double] = new DoubleMinAggregate()
-}
-
-class BooleanMinAggregateTest extends AggregateTestBase[Boolean] {
-
-  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
-    Seq(
-      false,
-      false,
-      false
-    ),
-    Seq(
-      true,
-      true,
-      true
-    ),
-    Seq(
-      true,
-      false,
-      null.asInstanceOf[Boolean],
-      true,
-      false,
-      true,
-      null.asInstanceOf[Boolean]
-    ),
-    Seq(
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean],
-      null.asInstanceOf[Boolean]
-    )
-  )
-
-  override def expectedResults: Seq[Boolean] = Seq(
-    false,
-    true,
-    false,
-    null.asInstanceOf[Boolean]
-  )
-
-  override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate()
-}
-
-class DecimalMinAggregateTest extends AggregateTestBase[BigDecimal] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("1000"),
-      new BigDecimal("-1"),
-      new BigDecimal("-999.998999"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-999.999"),
-      null,
-      new BigDecimal("999.999")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("-999.999"),
-    null
-  )
-
-  override def aggregator: Aggregate[BigDecimal] = new DecimalMinAggregate()
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c31f95ca/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala
deleted file mode 100644
index c085334..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/SumAggregateTest.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-abstract class SumAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def maxVal: T
-  private val minVal = numeric.negate(maxVal)
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      numeric.fromInt(1),
-      null.asInstanceOf[T],
-      numeric.fromInt(2),
-      numeric.fromInt(3),
-      numeric.fromInt(4),
-      numeric.fromInt(5),
-      numeric.fromInt(-10),
-      numeric.fromInt(-20),
-      numeric.fromInt(17),
-      null.asInstanceOf[T],
-      maxVal
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    numeric.fromInt(2),
-    null.asInstanceOf[T]
-  )
-}
-
-class ByteSumAggregateTest extends SumAggregateTestBase[Byte] {
-
-  override def maxVal = (Byte.MaxValue / 2).toByte
-
-  override def aggregator: Aggregate[Byte] = new ByteSumAggregate
-}
-
-class ShortSumAggregateTest extends SumAggregateTestBase[Short] {
-
-  override def maxVal = (Short.MaxValue / 2).toShort
-
-  override def aggregator: Aggregate[Short] = new ShortSumAggregate
-}
-
-class IntSumAggregateTest extends SumAggregateTestBase[Int] {
-
-  override def maxVal = Int.MaxValue / 2
-
-  override def aggregator: Aggregate[Int] = new IntSumAggregate
-}
-
-class LongSumAggregateTest extends SumAggregateTestBase[Long] {
-
-  override def maxVal = Long.MaxValue / 2
-
-  override def aggregator: Aggregate[Long] = new LongSumAggregate
-}
-
-class FloatSumAggregateTest extends SumAggregateTestBase[Float] {
-
-  override def maxVal = 12345.6789f
-
-  override def aggregator: Aggregate[Float] = new FloatSumAggregate
-}
-
-class DoubleSumAggregateTest extends SumAggregateTestBase[Double] {
-
-  override def maxVal = 12345.6789d
-
-  override def aggregator: Aggregate[Double] = new DoubleSumAggregate
-}
-
-class DecimalSumAggregateTest extends AggregateTestBase[BigDecimal] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("1"),
-      new BigDecimal("2"),
-      new BigDecimal("3"),
-      null,
-      new BigDecimal("0"),
-      new BigDecimal("-1000"),
-      new BigDecimal("0.000000000002"),
-      new BigDecimal("1000"),
-      new BigDecimal("-0.000000000001"),
-      new BigDecimal("999.999"),
-      null,
-      new BigDecimal("4"),
-      new BigDecimal("-999.999"),
-      null
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    new BigDecimal("10.000000000001"),
-    null
-  )
-
-  override def aggregator: Aggregate[BigDecimal] = new DecimalSumAggregate()
-}


Mime
View raw message