flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-6124] [table] support max/min aggregations for string type
Date Tue, 21 Mar 2017 11:22:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 86d32ac84 -> 17dd915e8


[FLINK-6124] [table] support max/min aggregations for string type

This closes #3579.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17dd915e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17dd915e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17dd915e

Branch: refs/heads/master
Commit: 17dd915e8a18e60fa32ada9500d3632ba162720a
Parents: 86d32ac
Author: Zhenghua Gao <docete@gmail.com>
Authored: Mon Mar 20 19:29:31 2017 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Tue Mar 21 12:18:30 2017 +0100

----------------------------------------------------------------------
 .../functions/aggfunctions/MaxAggFunction.scala |  8 +++++
 .../functions/aggfunctions/MinAggFunction.scala |  8 +++++
 .../table/runtime/aggregate/AggregateUtil.scala |  4 +++
 .../scala/batch/sql/AggregationsITCase.scala    | 31 +++---------------
 .../aggfunctions/MaxAggFunctionTest.scala       | 33 ++++++++++++++++++++
 .../aggfunctions/MinAggFunctionTest.scala       | 33 ++++++++++++++++++++
 6 files changed, 90 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 55e3e5f..3793434 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -155,3 +155,11 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] {
   override def getInitValue = BigDecimal.ZERO
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }
+
+/**
+  * Built-in String Max aggregate function
+  */
+class StringMaxAggFunction extends MaxAggFunction[String] {
+  override def getInitValue = "".toString
+  override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index 647388a..41361fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -155,3 +155,11 @@ class DecimalMinAggFunction extends MinAggFunction[BigDecimal] {
   override def getInitValue: BigDecimal = BigDecimal.ZERO
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }
+
+/**
+  * Built-in String Min aggregate function
+  */
+class StringMinAggFunction extends MinAggFunction[String] {
+  override def getInitValue = "".toString
+  override def getValueTypeInfo = BasicTypeInfo.STRING_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index b6b3445..9feec17 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -916,6 +916,8 @@ object AggregateUtil {
                   new DecimalMinAggFunction
                 case BOOLEAN =>
                   new BooleanMinAggFunction
+                case VARCHAR | CHAR =>
+                  new StringMinAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException("Min aggregate does no support type:" + sqlType)
               }
@@ -961,6 +963,8 @@ object AggregateUtil {
                   new DecimalMaxAggFunction
                 case BOOLEAN =>
                   new BooleanMaxAggFunction
+                case VARCHAR | CHAR =>
+                  new StringMaxAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException("Max aggregate does no support type:" + sqlType)
               }

http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
index a60cfaa..cceb272 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
@@ -92,36 +92,13 @@ class AggregationsITCase(
   }
 
   @Test
-  def testWorkingAggregationDataTypes(): Unit = {
+  def testAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery =
-      "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7), " +
-      "  sum(CAST(_6 AS DECIMAL))" +
-      "FROM MyTable"
-
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao"))
-    tEnv.registerDataSet("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,1,1,1.5,1.5,2,3.0"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
-      "FROM MyTable"
+    val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g), " +
+      "min(g), min('Ciao'), max(g), max('Ciao'), sum(CAST(f AS DECIMAL)) FROM MyTable"
 
     val ds = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
@@ -130,7 +107,7 @@ class AggregationsITCase(
 
     val result = tEnv.sql(sqlQuery)
 
-    val expected = "1,1,1,1,1.5,1.5,2"
+    val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
index 396be24..38ea993 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
@@ -192,3 +192,36 @@ class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal]
{
 
   override def supportRetraction: Boolean = false
 }
+
+class StringMaxAggFunctionTest extends AggFunctionTestBase[String] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new String("a"),
+      new String("b"),
+      new String("c"),
+      null.asInstanceOf[String],
+      new String("d")
+    ),
+    Seq(
+      null.asInstanceOf[String],
+      null.asInstanceOf[String],
+      null.asInstanceOf[String]
+    ),
+    Seq(
+      new String("1House"),
+      new String("Household"),
+      new String("house"),
+      new String("household")
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    new String("d"),
+    null.asInstanceOf[String],
+    new String("household")
+  )
+
+  override def aggregator: AggregateFunction[String] = new StringMaxAggFunction()
+
+  override def supportRetraction: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/17dd915e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
index 7d9e52b..84e541a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
@@ -192,3 +192,36 @@ class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal]
{
 
   override def supportRetraction: Boolean = false
 }
+
+class StringMinAggFunctionTest extends AggFunctionTestBase[String] {
+  override def inputValueSets: Seq[Seq[_]] = Seq(
+    Seq(
+      new String("a"),
+      new String("b"),
+      new String("c"),
+      null.asInstanceOf[String],
+      new String("d")
+    ),
+    Seq(
+      null.asInstanceOf[String],
+      null.asInstanceOf[String],
+      null.asInstanceOf[String]
+    ),
+    Seq(
+      new String("1House"),
+      new String("Household"),
+      new String("house"),
+      new String("household")
+    )
+  )
+
+  override def expectedResults: Seq[String] = Seq(
+    new String("a"),
+    null.asInstanceOf[String],
+    new String("1House")
+  )
+
+  override def aggregator: AggregateFunction[String] = new StringMinAggFunction()
+
+  override def supportRetraction: Boolean = false
+}


Mime
View raw message