flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] flink git commit: [FLINK-5768] [table] Refactor DataSet and DataStream aggregations to use UDAGG interface.
Date Thu, 02 Mar 2017 20:32:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7fe0eb477 -> 438276de8


http://git-wip-us.apache.org/repos/asf/flink/blob/438276de/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
index 627b25b..5ba3e34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
@@ -51,7 +51,7 @@ abstract class AggFunctionTestBase[T] {
   // test aggregate functions with partial merge
   def testAggregateWithMerge(): Unit = {
 
-    if (ifMethodExitInFunction("merge", aggregator)) {
+    if (ifMethodExistInFunction("merge", aggregator)) {
       // iterate over input sets
       for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
         //equally split the vals sequence into two sequences

http://git-wip-us.apache.org/repos/asf/flink/blob/438276de/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index f13f350..071f0ee 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.runtime.dataset
 
+import java.math.BigDecimal
+
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
@@ -37,20 +39,22 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
   extends TableProgramsClusterTestBase(configMode) {
 
   val data = List(
-    (1L, 1, "Hi"),
-    (2L, 2, "Hallo"),
-    (3L, 2, "Hello"),
-    (6L, 3, "Hello"),
-    (4L, 5, "Hello"),
-    (16L, 4, "Hello world"),
-    (8L, 3, "Hello world"))
+    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+    (6L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"),
+    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"))
 
   @Test(expected = classOf[UnsupportedOperationException])
   def testAllEventTimeTumblingWindowOverCount(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
 
     // Count tumbling non-grouping window on event-time are currently not supported
     table
@@ -65,14 +69,20 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
 
     val windowedTable = table
       .window(Tumble over 2.rows on 'long as 'w)
       .groupBy('w, 'string)
-      .select('string, 'int.sum)
+      .select('string, 'int.sum, 'int.count, 'int.max, 'int.min, 'int.avg,
+              'double.sum, 'double.count, 'double.max, 'double.min, 'double.avg,
+              'float.sum, 'float.count, 'float.max, 'float.min, 'float.avg,
+              'bigdec.sum, 'bigdec.count, 'bigdec.max, 'bigdec.min, 'bigdec.avg)
 
-    val expected = "Hello,7\n" + "Hello world,7\n"
+    val expected = "Hello,7,2,5,2,3,7.0,2,5.0,2.0,3.5,7.0,2,5.0,2.0,3.5,7,2,5,2,3.5\n" +
+      "Hello world,7,2,4,3,3,7.0,2,4.0,3.0,3.5,7.0,2,4.0,3.0,3.5,7,2,4,3,3.5\n"
     val results = windowedTable.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -82,7 +92,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
 
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)
@@ -105,7 +117,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
 
     val windowedTable = table
       .window(Tumble over 5.milli on 'long as 'w)
@@ -125,7 +139,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
     val windowedTable = table
       .window(Session withGap 7.milli on 'long as 'w)
       .groupBy('string, 'w)
@@ -146,7 +162,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     // Non-grouping Session window on event-time are currently not supported
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
     val windowedTable =table
       .window(Session withGap 7.milli on 'long as 'w)
       .groupBy('w)
@@ -158,7 +176,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
     table
       .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('w, 'string)


Mime
View raw message