flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/4] flink git commit: [FLINK-3939] [tableAPI] Prevent translation of unsupported distinct aggregates and grouping sets.
Date Mon, 23 May 2016 14:30:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5fdf39b1f -> 9cc629662


[FLINK-3939] [tableAPI] Prevent translation of unsupported distinct aggregates and grouping
sets.

This closes #2014


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/173d24df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/173d24df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/173d24df

Branch: refs/heads/master
Commit: 173d24dfdd79c497dc31c6f04dcda47a83ede494
Parents: 5fdf39b
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon May 23 14:02:21 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon May 23 14:02:21 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala |  5 ++
 .../rules/dataSet/DataSetAggregateRule.scala    | 49 ++++++++++++++------
 .../scala/batch/sql/AggregationsITCase.scala    | 46 ++++++++++++++++++
 3 files changed, 85 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/173d24df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 207500a..b25c940 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -256,6 +256,11 @@ abstract class BatchTableEnvironment(
           s"Cannot generate a valid execution plan for the given query: \n\n" +
             s"${RelOptUtil.toString(relNode)}\n" +
             "Please consider filing a bug report.", e)
+      case t: TableException =>
+        throw new PlanGenException(
+        s"Cannot generate a valid execution plan for the given query: \n\n" +
+          s"${RelOptUtil.toString(relNode)}\n" +
+          t.msg)
       case a: AssertionError =>
         throw a.getCause
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/173d24df/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
index 0449fc3..9f78adb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -18,10 +18,11 @@
 
 package org.apache.flink.api.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.api.table.TableException
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
 import scala.collection.JavaConversions._
 
@@ -33,20 +34,38 @@ class DataSetAggregateRule
       "DataSetAggregateRule")
   {
 
-    def convert(rel: RelNode): RelNode = {
-      val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
-
-      new DataSetAggregate(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        agg.getNamedAggCalls,
-        rel.getRowType,
-        agg.getInput.getRowType,
-        agg.getGroupSet.toArray)
-      }
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw new TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw new TableException("GROUPING SETS are currently not supported.")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
+
+    new DataSetAggregate(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray)
+    }
   }
 
 object DataSetAggregateRule {

http://git-wip-us.apache.org/repos/asf/flink/blob/173d24df/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
index 49541eb..73c0724 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.{Row, TableEnvironment}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
@@ -211,4 +212,49 @@ class AggregationsITCase(
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test(expected = classOf[PlanGenException])
+  def testDistinctAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    // must fail. distinct aggregates are not supported
+    tEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[PlanGenException])
+  def testGroupedDistinctAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY
_2"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    // must fail. distinct aggregates are not supported
+    tEnv.sql(sqlQuery).toDataSet[Row]
+  }
+
+  @Test(expected = classOf[PlanGenException])
+  def testGroupingSetAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT _2, _3, avg(_1) as a FROM MyTable GROUP BY GROUPING SETS (_2,
_3)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    // must fail. grouping sets are not supported
+    tEnv.sql(sqlQuery).toDataSet[Row]
+  }
 }


Mime
View raw message