flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-3936] [tableAPI] Add MIN/MAX aggregation for Boolean.
Date Thu, 26 May 2016 15:15:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master ec38a212e -> ef5832d8f


[FLINK-3936] [tableAPI] Add MIN/MAX aggregation for Boolean.

This closes #2035


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5784f395
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5784f395
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5784f395

Branch: refs/heads/master
Commit: 5784f395536d8a5e6f7d8bfd28bd9c8c0ed99b18
Parents: ec38a21
Author: Fabian Hueske <fhueske@apache.org>
Authored: Sun May 22 15:46:06 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu May 26 12:16:16 2016 +0200

----------------------------------------------------------------------
 .../table/runtime/aggregate/AggregateUtil.scala |  4 +++
 .../table/runtime/aggregate/MaxAggregate.scala  | 15 ++++++++--
 .../table/runtime/aggregate/MinAggregate.scala  | 15 ++++++++--
 .../runtime/aggregate/MaxAggregateTest.scala    | 29 ++++++++++++++++++++
 .../runtime/aggregate/MinAggregateTest.scala    | 29 ++++++++++++++++++++
 5 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5784f395/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 8222a2e..82364eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -192,6 +192,8 @@ object AggregateUtil {
                 new FloatMinAggregate
               case DOUBLE =>
                 new DoubleMinAggregate
+              case BOOLEAN =>
+                new BooleanMinAggregate
               case sqlType: SqlTypeName =>
                 throw new TableException("Min aggregate does no support type:" + sqlType)
             }
@@ -209,6 +211,8 @@ object AggregateUtil {
                 new FloatMaxAggregate
               case DOUBLE =>
                 new DoubleMaxAggregate
+              case BOOLEAN =>
+                new BooleanMaxAggregate
               case sqlType: SqlTypeName =>
                 throw new TableException("Max aggregate does no support type:" + sqlType)
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/5784f395/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
index 8f491f2..9ad0468 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.table.runtime.aggregate
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.Row
 
-abstract class MaxAggregate[T: Numeric] extends Aggregate[T] {
+abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
 
-  private val numeric = implicitly[Numeric[T]]
   protected var maxIndex = -1
 
   /**
@@ -49,7 +48,8 @@ abstract class MaxAggregate[T: Numeric] extends Aggregate[T] {
   override def merge(intermediate: Row, buffer: Row): Unit = {
     val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
     val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
-    buffer.setField(maxIndex, numeric.max(partialValue, bufferValue))
+    val max: T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
+    buffer.setField(maxIndex, max)
   }
 
   /**
@@ -122,3 +122,12 @@ class DoubleMaxAggregate extends MaxAggregate[Double] {
     intermediate.setField(maxIndex, Double.MinValue)
   }
 }
+
+class BooleanMaxAggregate extends MaxAggregate[Boolean] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(maxIndex, false)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5784f395/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
index e78fb00..b607e6b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.table.runtime.aggregate
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.Row
 
-abstract  class MinAggregate[T: Numeric] extends Aggregate[T]{
+abstract  class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{
 
-  private val numeric = implicitly[Numeric[T]]
   protected var minIndex: Int = _
 
   /**
@@ -49,7 +48,8 @@ abstract  class MinAggregate[T: Numeric] extends Aggregate[T]{
   override def merge(partial: Row, buffer: Row): Unit = {
     val partialValue = partial.productElement(minIndex).asInstanceOf[T]
     val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
-    buffer.setField(minIndex, numeric.min(partialValue, bufferValue))
+    val min: T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
+    buffer.setField(minIndex, min)
   }
 
   /**
@@ -122,3 +122,12 @@ class DoubleMinAggregate extends MinAggregate[Double] {
     intermediate.setField(minIndex, Double.MaxValue)
   }
 }
+
+class BooleanMinAggregate extends MinAggregate[Boolean] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(minIndex, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5784f395/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
index f3951e4..e049e49 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
@@ -91,3 +91,32 @@ class DoubleMaxAggregateTest extends MaxAggregateTestBase[Double] {
 
   override def aggregator: Aggregate[Double] = new DoubleMaxAggregate()
 }
+
+class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(false, true, true)
+
+  override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5784f395/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
index 3a4b111..7cf7bb1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
@@ -91,3 +91,32 @@ class DoubleMinAggregateTest extends MinAggregateTestBase[Double] {
 
   override def aggregator: Aggregate[Double] = new DoubleMinAggregate()
 }
+
+class BooleanMinAggregateTest extends AggregateTestBase[Boolean] {
+
+  override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+    Seq(
+      false,
+      false,
+      false
+    ),
+    Seq(
+      true,
+      true,
+      true
+    ),
+    Seq(
+      true,
+      false,
+      null.asInstanceOf[Boolean],
+      true,
+      false,
+      true,
+      null.asInstanceOf[Boolean]
+    )
+  )
+
+  override def expectedResults: Seq[Boolean] = Seq(false, true, false)
+
+  override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate()
+}


Mime
View raw message