flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] flink git commit: [FLINK-7051] [table] Bump Calcite version to 1.14.
Date Thu, 26 Oct 2017 21:53:42 GMT
[FLINK-7051] [table] Bump Calcite version to 1.14.

This closes #4873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d830881
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d830881
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d830881

Branch: refs/heads/master
Commit: 4d8308810b21fe6b760f7c7f30788fb1f4a866d7
Parents: 6f83b41
Author: Haohui Mai <wheat9@apache.org>
Authored: Fri Oct 20 20:32:34 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Oct 26 21:57:11 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |   7 +-
 .../flink/table/calcite/CalciteConfig.scala     |  10 +
 .../flink/table/calcite/FlinkPlannerImpl.scala  |   7 +-
 .../flink/table/calcite/FlinkRelBuilder.scala   |   4 +-
 .../table/catalog/ExternalCatalogSchema.scala   |   4 +-
 .../codegen/AggregationCodeGenerator.scala      |  28 ---
 .../table/functions/utils/AggSqlFunction.scala  |   9 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   9 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   2 +
 .../rules/dataSet/DataSetAggregateRule.scala    |  44 +---
 .../DataSetAggregateWithNullValuesRule.scala    |   3 +-
 .../logical/DecomposeGroupingSetRule.scala      | 148 +++++++++++++
 .../table/runtime/aggregate/AggregateUtil.scala |  75 +------
 .../runtime/aggregate/DataSetAggFunction.scala  |   3 -
 .../aggregate/DataSetFinalAggFunction.scala     |   3 -
 .../aggregate/GeneratedAggregations.scala       |   7 -
 .../runtime/batch/sql/GroupingSetsITCase.java   |  60 ++---
 .../table/api/batch/sql/AggregateTest.scala     | 181 +--------------
 .../api/batch/sql/DistinctAggregateTest.scala   |   6 +-
 .../table/api/batch/sql/GroupingSetsTest.scala  | 219 ++++++++++---------
 .../catalog/ExternalCatalogSchemaTest.scala     |  14 +-
 21 files changed, 359 insertions(+), 484 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 60a325e..22f00fe 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -62,7 +62,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.calcite</groupId>
 			<artifactId>calcite-core</artifactId>
-			<version>1.13.0</version>
+			<version>1.14.0</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.calcite.avatica</groupId>
@@ -237,6 +237,7 @@ under the License.
 									<!-- Calcite's dependencies -->
 									<include>com.google.guava:guava</include>
 									<include>net.hydromatic:*</include>
+									<include>com.esri.geometry:*</include>
 
 									<!-- flink-table dependencies -->
 									<include>commons-configuration:*</include>
@@ -263,6 +264,10 @@ under the License.
 									<pattern>org.pentaho</pattern>
 									<shadedPattern>org.apache.flink.calcite.shaded.org.pentaho</shadedPattern>
 								</relocation>
+								<relocation>
+									<pattern>com.esri</pattern>
+									<shadedPattern>org.apache.flink.calcite.shaded.com.esri</shadedPattern>
+								</relocation>
 
 								<!-- flink-table dependencies -->
 								<relocation>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index 871f194..accc628 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.calcite
 
+import java.util.Properties
+
+import org.apache.calcite.config.{CalciteConnectionConfig, CalciteConnectionConfigImpl, CalciteConnectionProperty}
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
@@ -304,4 +307,11 @@ object CalciteConfig {
   def createBuilder(): CalciteConfigBuilder = {
     new CalciteConfigBuilder
   }
+
+  def connectionConfig(parserConfig : SqlParser.Config): CalciteConnectionConfig = {
+    val prop = new Properties()
+    prop.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName,
+      String.valueOf(parserConfig.caseSensitive))
+    new CalciteConnectionConfigImpl(prop)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 4d9acaa..d573086 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,8 +19,10 @@
 package org.apache.flink.table.calcite
 
 import java.util
+import java.util.Properties
 
 import com.google.common.collect.ImmutableList
+import org.apache.calcite.config.{CalciteConnectionConfig, CalciteConnectionConfigImpl, CalciteConnectionProperty}
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
@@ -157,9 +159,10 @@ class FlinkPlannerImpl(
     val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
     new CalciteCatalogReader(
       CalciteSchema.from(rootSchema),
-      parserConfig.caseSensitive,
       CalciteSchema.from(defaultSchema).path(null),
-      typeFactory)
+      typeFactory,
+      CalciteConfig.connectionConfig(parserConfig)
+    )
   }
 
   private def createRexBuilder: RexBuilder = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 54904b9..1ac9b53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -85,9 +85,9 @@ object FlinkRelBuilder {
     val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
     val relOptSchema = new CalciteCatalogReader(
       calciteSchema,
-      config.getParserConfig.caseSensitive(),
       Collections.emptyList(),
-      typeFactory)
+      typeFactory,
+      CalciteConfig.connectionConfig(config.getParserConfig))
 
     new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 0f1efcf..d87d665 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -94,9 +94,7 @@ class ExternalCatalogSchema(
 
   override def getTableNames: JSet[String] = JCollections.emptySet[String]
 
-  override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = true
-
-  override def snapshot(l: Long): Schema = this
+  override def snapshot(v: SchemaVersion): Schema = this
 
   /**
     * Registers sub-Schemas to current schema plus

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 82a2420..c85b111 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -78,8 +78,6 @@ class AggregationCodeGenerator(
     * @param fwdMapping  The mapping of input fields to output fields
     * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we
     *                     assume that both rows have the accumulators at the same position.
-    * @param constantFlags An optional parameter to define where to set constant boolean flags in
-    *                      the output row.
     * @param outputArity The number of fields in the output row.
     * @param needRetract a flag to indicate if the aggregate needs the retract method
     * @param needMerge a flag to indicate if the aggregate needs the merge method
@@ -97,7 +95,6 @@ class AggregationCodeGenerator(
       partialResults: Boolean,
       fwdMapping: Array[Int],
       mergeMapping: Option[Array[Int]],
-      constantFlags: Option[Array[(Int, Boolean)]],
       outputArity: Int,
       needRetract: Boolean,
       needMerge: Boolean,
@@ -468,30 +465,6 @@ class AggregationCodeGenerator(
          |  }""".stripMargin
     }
 
-    def genSetConstantFlags: String = {
-
-      val sig: String =
-        j"""
-           |  public final void setConstantFlags(org.apache.flink.types.Row output)
-           |    """.stripMargin
-
-      val setFlags: String = if (constantFlags.isDefined) {
-        {
-          for (cf <- constantFlags.get) yield {
-            j"""
-               |    output.setField(${cf._1}, ${if (cf._2) "true" else "false"});"""
-            .stripMargin
-          }
-        }.mkString("\n")
-      } else {
-        ""
-      }
-
-      j"""$sig {
-         |$setFlags
-         |  }""".stripMargin
-    }
-
     def genCreateOutputRow: String = {
       j"""
          |  public final org.apache.flink.types.Row createOutputRow() {
@@ -585,7 +558,6 @@ class AggregationCodeGenerator(
       genRetract,
       genCreateAccumulators,
       genSetForwardedFields,
-      genSetConstantFlags,
       genCreateOutputRow,
       genMergeAccumulatorsPair,
       genResetAccumulator).mkString("\n")

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index f44598b..4b1e921 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -18,7 +18,11 @@
 
 package org.apache.flink.table.functions.utils
 
+import java.util
+import java.util.Collections
+
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
@@ -58,7 +62,8 @@ class AggSqlFunction(
     // will be generated when translating the calcite relnode to flink runtime execution plan
     null,
     false,
-    requiresOver
+    requiresOver,
+    typeFactory
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
@@ -66,6 +71,8 @@ class AggSqlFunction(
   override def isDeterministic: Boolean = aggregateFunction.isDeterministic
 
   override def toString: String = displayName
+
+  override def getParamTypes: util.List[RelDataType] = null
 }
 
 object AggSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index bbc5746..bdc3d7a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -45,8 +45,7 @@ class DataSetAggregate(
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     rowRelDataType: RelDataType,
     inputType: RelDataType,
-    grouping: Array[Int],
-    inGroupingSet: Boolean)
+    grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -59,8 +58,7 @@ class DataSetAggregate(
       namedAggregates,
       getRowType,
       inputType,
-      grouping,
-      inGroupingSet)
+      grouping)
   }
 
   override def toString: String = {
@@ -111,8 +109,7 @@ class DataSetAggregate(
         input.getRowType,
         inputDS.getType.asInstanceOf[RowTypeInfo].getFieldTypes,
         rowRelDataType,
-        grouping,
-        inGroupingSet)
+        grouping)
 
     val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 36fdc6c..dcc735d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -145,6 +145,8 @@ object FlinkRuleSets {
     ReduceExpressionsRule.JOIN_INSTANCE,
     ProjectToWindowRule.PROJECT,
 
+    // Transform grouping sets
+    DecomposeGroupingSetRule.INSTANCE,
     // Transform window to LogicalWindowAggregate
     DataSetLogicalWindowAggregateRule.INSTANCE,
     WindowPropertiesRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
index b4d5bc9..9a31617 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -62,41 +62,15 @@ class DataSetAggregateRule
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
     val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASET)
 
-    if (agg.indicator) {
-        agg.groupSets.map(set =>
-          new DataSetAggregate(
-            rel.getCluster,
-            traitSet,
-            convInput,
-            agg.getNamedAggCalls,
-            rel.getRowType,
-            agg.getInput.getRowType,
-            set.toArray,
-            inGroupingSet = true
-          ).asInstanceOf[RelNode]
-        ).reduce(
-          (rel1, rel2) => {
-            new DataSetUnion(
-              rel.getCluster,
-              traitSet,
-              rel1,
-              rel2,
-              rel.getRowType
-            )
-          }
-        )
-    } else {
-      new DataSetAggregate(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        agg.getNamedAggCalls,
-        rel.getRowType,
-        agg.getInput.getRowType,
-        agg.getGroupSet.toArray,
-        inGroupingSet = false
-      )
-    }
+    new DataSetAggregate(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray
+    )
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
index f8b7fd7..4a1e6d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -79,8 +79,7 @@ class DataSetAggregateWithNullValuesRule
       agg.getNamedAggCalls,
       rel.getRowType,
       agg.getInput.getRowType,
-      agg.getGroupSet.toArray,
-      inGroupingSet = false
+      agg.getGroupSet.toArray
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/DecomposeGroupingSetRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/DecomposeGroupingSetRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/DecomposeGroupingSetRule.scala
new file mode 100644
index 0000000..922ea65
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/DecomposeGroupingSetRule.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+class DecomposeGroupingSetRule
+  extends RelOptRule(
+    operand(classOf[LogicalAggregate], any),
+  "DecomposeGroupingSetRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    !agg.getGroupSets.isEmpty &&
+      DecomposeGroupingSetRule.getGroupIdExprIndexes(agg.getAggCallList).nonEmpty
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+    val groupIdExprs = DecomposeGroupingSetRule.getGroupIdExprIndexes(agg.getAggCallList).toSet
+
+    val subAggs = agg.groupSets.map(set =>
+      DecomposeGroupingSetRule.decompose(call.builder(), agg, groupIdExprs, set))
+
+    val union = subAggs.reduce((l, r) => new LogicalUnion(
+      agg.getCluster,
+      agg.getTraitSet,
+      Seq(l, r),
+      true
+    ))
+    call.transformTo(union)
+  }
+}
+
+object DecomposeGroupingSetRule {
+  val INSTANCE = new DecomposeGroupingSetRule
+
+  private def getGroupIdExprIndexes(aggCalls: Seq[AggregateCall]): Seq[Int] = {
+    aggCalls.zipWithIndex.filter { case (call, _) =>
+        call.getAggregation.getKind match {
+          case SqlKind.GROUP_ID | SqlKind.GROUPING | SqlKind.GROUPING_ID =>
+            true
+          case _ =>
+            false
+        }
+    }.map { case (_, idx) => idx}
+  }
+
+  private def decompose(
+     relBuilder: RelBuilder,
+     agg: LogicalAggregate,
+     groupExprIndexes : Set[Int],
+     groupSet: ImmutableBitSet) = {
+
+    val aggsWithIndexes = agg.getAggCallList.zipWithIndex
+
+    // construct aggregate without groupExpressions
+    val subAgg = new LogicalAggregate(
+      agg.getCluster,
+      agg.getTraitSet,
+      agg.getInput,
+      false,
+      groupSet,
+      Seq(),
+      aggsWithIndexes.collect{ case (call, idx) if !groupExprIndexes.contains(idx) => call }
+    )
+    relBuilder.push(subAgg)
+
+    val rexBuilder = relBuilder.getRexBuilder
+    // get names of grouping fields
+    val groupingFieldsName = Seq.range(0, agg.getGroupCount)
+      .map(x => agg.getRowType.getFieldNames.get(x))
+
+    // create null literals for all grouping fields
+    val groupingFields: Array[RexNode] = Seq.range(0, agg.getGroupCount)
+      .map(x => rexBuilder.makeNullLiteral(agg.getRowType.getFieldList.get(x).getType)).toArray
+    // override null literals with field access for grouping fields of current aggregation
+    groupSet.toList.zipWithIndex.foreach { case (group, idx) =>
+      groupingFields(group) = rexBuilder.makeInputRef(relBuilder.peek(), idx)
+    }
+
+    var aggCnt = 0
+    val aggFields = aggsWithIndexes.map {
+      case (call, idx) if groupExprIndexes.contains(idx) =>
+        // create literal for group expression
+        lowerGroupExpr(rexBuilder, call, groupSet)
+      case _ =>
+        // create access to aggregation result
+        val aggResult = rexBuilder.makeInputRef(subAgg, subAgg.getGroupCount + aggCnt)
+        aggCnt += 1
+        aggResult
+    }
+
+    // add a projection to establish the result schema and set the values of the group expressions.
+    relBuilder.project(
+      groupingFields.toSeq ++ aggFields,
+      groupingFieldsName ++ agg.getAggCallList.map(_.name))
+    // return aggregation + projection
+    relBuilder.build()
+  }
+
+  /** Returns a literal for a given group expression. */
+  private def lowerGroupExpr(
+      builder: RexBuilder,
+      call: AggregateCall,
+      groupSet: ImmutableBitSet) : RexNode = {
+
+    val groups = groupSet.asSet()
+
+    call.getAggregation.getKind match {
+      case SqlKind.GROUP_ID =>
+        val id = groupSet.asList().map(x => 1 << x).sum
+        builder.makeLiteral(id, call.getType, false)
+      case SqlKind.GROUPING | SqlKind.GROUPING_ID =>
+        val res = call.getArgList.foldLeft(0)((res, arg) =>
+          (res << 1) + (if (groups.contains(arg)) 1 else 0)
+        )
+        builder.makeLiteral(res, call.getType, false)
+      case _ => builder.constantNull()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index bdfdbf5..a867b1c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -50,7 +50,6 @@ import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 object AggregateUtil {
 
@@ -104,7 +103,6 @@ object AggregateUtil {
       partialResults = false,
       forwardMapping,
       None,
-      None,
       outputArity,
       needRetract = false,
       needMerge = false,
@@ -184,7 +182,6 @@ object AggregateUtil {
       partialResults = false,
       groupings,
       None,
-      None,
       outputArity,
       consumeRetraction,
       needMerge = false,
@@ -250,7 +247,6 @@ object AggregateUtil {
       partialResults = false,
       forwardMapping,
       None,
-      None,
       outputArity,
       needRetract,
       needMerge = false,
@@ -358,7 +354,7 @@ object AggregateUtil {
       case SlidingGroupWindow(_, time, size, slide) =>
         val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
         size match {
-          case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+          case Literal(_: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
             // pre-tumble incremental aggregates on time-windows
             val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
             val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
@@ -382,7 +378,6 @@ object AggregateUtil {
       partialResults = true,
       groupings,
       None,
-      None,
       outputArity,
       needRetract,
       needMerge = false,
@@ -461,7 +456,6 @@ object AggregateUtil {
           partialResults = true,
           groupings.indices.toArray,
           Some(aggregates.indices.map(_ + groupings.length).toArray),
-          None,
           keysAndAggregatesArity + 1,
           needRetract,
           needMerge = true,
@@ -565,7 +559,6 @@ object AggregateUtil {
       partialResults = true,
       groupings,
       Some(aggregates.indices.map(_ + groupings.length).toArray),
-      None,
       outputType.getFieldCount,
       needRetract,
       needMerge = true,
@@ -582,7 +575,6 @@ object AggregateUtil {
       partialResults = false,
       groupings.indices.toArray,
       Some(aggregates.indices.map(_ + groupings.length).toArray),
-      None,
       outputType.getFieldCount,
       needRetract,
       needMerge = true,
@@ -730,7 +722,6 @@ object AggregateUtil {
           partialResults = true,
           groupings.indices.toArray,
           Some(aggregates.indices.map(_ + groupings.length).toArray),
-          None,
           groupings.length + aggregates.length + 2,
           needRetract,
           needMerge = true,
@@ -805,7 +796,6 @@ object AggregateUtil {
           partialResults = true,
           groupings.indices.toArray,
           Some(aggregates.indices.map(_ + groupings.length).toArray),
-          None,
           groupings.length + aggregates.length + 2,
           needRetract,
           needMerge = true,
@@ -837,8 +827,7 @@ object AggregateUtil {
       inputType: RelDataType,
       inputFieldTypeInfo: Seq[TypeInformation[_]],
       outputType: RelDataType,
-      groupings: Array[Int],
-      inGroupingSet: Boolean): (Option[DataSetPreAggFunction],
+      groupings: Array[Int]): (Option[DataSetPreAggFunction],
         Option[TypeInformation[Row]],
         RichGroupReduceFunction[Row, Row]) = {
 
@@ -855,18 +844,6 @@ object AggregateUtil {
       outputType
     )
 
-    val constantFlags: Option[Array[(Int, Boolean)]] =
-    if (inGroupingSet) {
-
-      val groupingSetsMapping = getGroupingSetsIndicatorMapping(inputType, outputType)
-      val nonNullKeysFields = gkeyOutMapping.map(_._1)
-      val flags = for ((in, out) <- groupingSetsMapping) yield
-        (out, !nonNullKeysFields.contains(in))
-      Some(flags)
-    } else {
-      None
-    }
-
     val aggOutFields = aggOutMapping.map(_._1)
 
     if (doAllSupportPartialMerge(aggregates)) {
@@ -886,7 +863,6 @@ object AggregateUtil {
         partialResults = true,
         groupings,
         None,
-        None,
         groupings.length + aggregates.length,
         needRetract,
         needMerge = false,
@@ -913,7 +889,6 @@ object AggregateUtil {
         partialResults = false,
         gkeyMapping,
         Some(aggregates.indices.map(_ + groupings.length).toArray),
-        constantFlags,
         outputType.getFieldCount,
         needRetract,
         needMerge = true,
@@ -937,7 +912,6 @@ object AggregateUtil {
         partialResults = false,
         groupings,
         None,
-        constantFlags,
         outputType.getFieldCount,
         needRetract,
         needMerge = false,
@@ -1035,7 +1009,6 @@ object AggregateUtil {
       partialResults = false,
       groupingKeys,
       None,
-      None,
       outputArity,
       needRetract,
       needMerge,
@@ -1099,50 +1072,6 @@ object AggregateUtil {
     (groupOutMapping, aggOutMapping)
   }
 
-  /**
-    * Determines the mapping of grouping keys to boolean indicators that describe the
-    * current grouping set.
-    *
-    * E.g.: Given we group on f1 and f2 of the input type, the output type contains two
-    * boolean indicator fields i$f1 and i$f2.
-    */
-  private def getGroupingSetsIndicatorMapping(
-    inputType: RelDataType,
-    outputType: RelDataType): Array[(Int, Int)] = {
-
-    val inputFields = inputType.getFieldList.map(_.getName)
-
-    // map from field -> i$field or field -> i$field_0
-    val groupingFields = inputFields.map(inputFieldName => {
-      val base = "i$" + inputFieldName
-      var name = base
-      var i = 0
-      while (inputFields.contains(name)) {
-          name = base + "_" + i // if i$XXX is already a field it will be suffixed by _NUMBER
-          i = i + 1
-        }
-        inputFieldName -> name
-      }).toMap
-
-    val outputFields = outputType.getFieldList
-
-    var mappingsBuffer = ArrayBuffer[(Int, Int)]()
-    for (i <- outputFields.indices) {
-      for (j <- outputFields.indices) {
-        val possibleKey = outputFields(i).getName
-        val possibleIndicator1 = outputFields(j).getName
-        // get indicator for output field
-        val possibleIndicator2 = groupingFields.getOrElse(possibleKey, null)
-
-        // check if indicator names match
-        if (possibleIndicator1 == possibleIndicator2) {
-          mappingsBuffer += ((i, j))
-        }
-      }
-    }
-    mappingsBuffer.toArray
-  }
-
   private def isTimeWindow(window: LogicalWindow) = {
     window match {
       case TumblingGroupWindow(_, _, size) => isTimeIntervalLiteral(size)

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
index 83e1b13..bc0c163 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
@@ -77,9 +77,6 @@ class DataSetAggFunction(
     // set agg results to output
     function.setAggregationResults(accumulators, output)
 
-    // set grouping set flags to output
-    function.setConstantFlags(output)
-
     out.collect(output)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
index 5276271..3b3be70 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
@@ -77,9 +77,6 @@ class DataSetFinalAggFunction(
     // get final aggregate value and set to output.
     function.setAggregationResults(accumulators, output)
 
-    // set grouping set flags to output
-    function.setConstantFlags(output)
-
     out.collect(output)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
index 7b20114..7549db5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
@@ -54,13 +54,6 @@ abstract class GeneratedAggregations extends Function {
   def setForwardedFields(input: Row, output: Row)
 
   /**
-    * Sets constant flags (boolean fields) to an output row.
-    *
-    * @param output The output row to which the constant flags are set.
-    */
-  def setConstantFlags(output: Row)
-
-  /**
     * Accumulates the input values to the accumulators.
     *
     * @param accumulators the accumulators (saved in a row) which contains the current

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index 455e8ce..66043cc 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -84,38 +84,40 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
 				" GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
 				" GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
-				" GROUPING_ID(f1, f2) as gid " +
+				" GROUPING_ID(f1, f2) as gid, " +
+				" COUNT(*) as cnt" +
 				" FROM " + TABLE_NAME +
-				" GROUP BY GROUPING SETS (f1, f2)";
+				" GROUP BY GROUPING SETS (f1, f2, ())";
 
 		String expected =
-			"1,null,1,1,0,1,0,1,1\n" +
-			"6,null,18,1,0,1,0,1,1\n" +
-			"2,null,2,1,0,1,0,1,1\n" +
-			"4,null,8,1,0,1,0,1,1\n" +
-			"5,null,13,1,0,1,0,1,1\n" +
-			"3,null,5,1,0,1,0,1,1\n" +
-			"null,Comment#11,17,2,1,0,1,0,2\n" +
-			"null,Comment#8,14,2,1,0,1,0,2\n" +
-			"null,Comment#2,8,2,1,0,1,0,2\n" +
-			"null,Comment#1,7,2,1,0,1,0,2\n" +
-			"null,Comment#14,20,2,1,0,1,0,2\n" +
-			"null,Comment#7,13,2,1,0,1,0,2\n" +
-			"null,Comment#6,12,2,1,0,1,0,2\n" +
-			"null,Comment#3,9,2,1,0,1,0,2\n" +
-			"null,Comment#12,18,2,1,0,1,0,2\n" +
-			"null,Comment#5,11,2,1,0,1,0,2\n" +
-			"null,Comment#15,21,2,1,0,1,0,2\n" +
-			"null,Comment#4,10,2,1,0,1,0,2\n" +
-			"null,Hi,1,2,1,0,1,0,2\n" +
-			"null,Comment#10,16,2,1,0,1,0,2\n" +
-			"null,Hello world,3,2,1,0,1,0,2\n" +
-			"null,I am fine.,5,2,1,0,1,0,2\n" +
-			"null,Hello world, how are you?,4,2,1,0,1,0,2\n" +
-			"null,Comment#9,15,2,1,0,1,0,2\n" +
-			"null,Comment#13,19,2,1,0,1,0,2\n" +
-			"null,Luke Skywalker,6,2,1,0,1,0,2\n" +
-			"null,Hello,2,2,1,0,1,0,2";
+			"1,null,1,1,1,0,1,0,2,1\n" +
+			"6,null,18,1,1,0,1,0,2,6\n" +
+			"2,null,2,1,1,0,1,0,2,2\n" +
+			"4,null,8,1,1,0,1,0,2,4\n" +
+			"5,null,13,1,1,0,1,0,2,5\n" +
+			"3,null,5,1,1,0,1,0,2,3\n" +
+			"null,Comment#11,17,2,0,1,0,1,1,1\n" +
+			"null,Comment#8,14,2,0,1,0,1,1,1\n" +
+			"null,Comment#2,8,2,0,1,0,1,1,1\n" +
+			"null,Comment#1,7,2,0,1,0,1,1,1\n" +
+			"null,Comment#14,20,2,0,1,0,1,1,1\n" +
+			"null,Comment#7,13,2,0,1,0,1,1,1\n" +
+			"null,Comment#6,12,2,0,1,0,1,1,1\n" +
+			"null,Comment#3,9,2,0,1,0,1,1,1\n" +
+			"null,Comment#12,18,2,0,1,0,1,1,1\n" +
+			"null,Comment#5,11,2,0,1,0,1,1,1\n" +
+			"null,Comment#15,21,2,0,1,0,1,1,1\n" +
+			"null,Comment#4,10,2,0,1,0,1,1,1\n" +
+			"null,Hi,1,2,0,1,0,1,1,1\n" +
+			"null,Comment#10,16,2,0,1,0,1,1,1\n" +
+			"null,Hello world,3,2,0,1,0,1,1,1\n" +
+			"null,I am fine.,5,2,0,1,0,1,1,1\n" +
+			"null,Hello world, how are you?,4,2,0,1,0,1,1,1\n" +
+			"null,Comment#9,15,2,0,1,0,1,1,1\n" +
+			"null,Comment#13,19,2,0,1,0,1,1,1\n" +
+			"null,Luke Skywalker,6,2,0,1,0,1,1,1\n" +
+			"null,Hello,2,2,0,1,0,1,1,1\n" +
+			"null,null,11,0,0,0,0,0,0,21";
 
 		checkSql(query, expected);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
index 445fed3..f2e250b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
 
 /**
@@ -199,183 +199,4 @@ class AggregateTest extends TableTestBase {
     )
     util.verifySql(sqlQuery, expected)
   }
-
-  @Test
-  def testGroupingSets(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " +
-      "GROUP BY GROUPING SETS (b, c)"
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetUnion",
-        unaryNode(
-          "DataSetAggregate",
-          batchTableNode(0),
-          term("groupBy", "b"),
-          term("select", "b", "AVG(a) AS c")
-        ),
-        unaryNode(
-          "DataSetAggregate",
-          batchTableNode(0),
-          term("groupBy", "c"),
-          term("select", "c AS b", "AVG(a) AS c")
-        ),
-        term("union", "b", "c", "i$b", "i$c", "a")
-      ),
-      term("select",
-        "CASE(i$b, null, b) AS b",
-        "CASE(i$c, null, c) AS c",
-        "a",
-        "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g") // GROUP_ID()
-    )
-
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testCube(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g, " +
-      "GROUPING(b) as gb, GROUPING(c) as gc, " +
-      "GROUPING_ID(b) as gib, GROUPING_ID(c) as gic, " +
-      "GROUPING_ID(b, c) as gid " +
-      "FROM MyTable " +
-      "GROUP BY CUBE (b, c)"
-
-    val group1 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b, c"),
-      term("select", "b", "c",
-           "AVG(a) AS i$b")
-    )
-
-    val group2 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b"),
-      term("select", "b",
-           "AVG(a) AS c")
-    )
-
-    val group3 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "c"),
-      term("select", "c AS b",
-           "AVG(a) AS c")
-    )
-
-    val group4 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("select",
-           "AVG(a) AS b")
-    )
-
-    val union1 = binaryNode(
-      "DataSetUnion",
-      group1, group2,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union2 = binaryNode(
-      "DataSetUnion",
-      union1, group3,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union3 = binaryNode(
-      "DataSetUnion",
-      union2, group4,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      union3,
-      term("select",
-           "CASE(i$b, null, b) AS b",
-           "CASE(i$c, null, c) AS c",
-           "a",
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID()
-           "CASE(i$b, 1, 0) AS gb", // GROUPING(b)
-           "CASE(i$c, 1, 0) AS gc", // GROUPING(c)
-           "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b)
-           "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c)
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c)
-    )
-
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testRollup(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g, " +
-                   "GROUPING(b) as gb, GROUPING(c) as gc, " +
-                   "GROUPING_ID(b) as gib, GROUPING_ID(c) as gic, " +
-                   "GROUPING_ID(b, c) as gid " + " FROM MyTable " +
-                   "GROUP BY ROLLUP (b, c)"
-
-    val group1 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b, c"),
-      term("select", "b", "c",
-           "AVG(a) AS i$b")
-    )
-
-    val group2 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b"),
-      term("select", "b",
-           "AVG(a) AS c")
-    )
-
-    val group3 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("select",
-           "AVG(a) AS b")
-    )
-
-    val union1 = binaryNode(
-      "DataSetUnion",
-      group1, group2,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union2 = binaryNode(
-      "DataSetUnion",
-      union1, group3,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      union2,
-      term("select",
-           "CASE(i$b, null, b) AS b",
-           "CASE(i$c, null, c) AS c",
-           "a",
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID()
-           "CASE(i$b, 1, 0) AS gb", // GROUPING(b)
-           "CASE(i$c, 1, 0) AS gc", // GROUPING(c)
-           "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b)
-           "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c)
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c)
-    )
-
-    util.verifySql(sqlQuery, aggregate)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
index 7ea2d8b..ce008e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
 
 class DistinctAggregateTest extends TableTestBase {
@@ -322,7 +322,7 @@ class DistinctAggregateTest extends TableTestBase {
         term("select", "a", "b", "COUNT(a) AS EXPR$1")
       ),
       term("groupBy", "a"),
-      term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
+      term("select", "a", "$SUM0(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
     )
 
     util.verifySql(sqlQuery, expected)
@@ -348,7 +348,7 @@ class DistinctAggregateTest extends TableTestBase {
         term("select", "a", "b", "COUNT(*) AS EXPR$1")
       ),
       term("groupBy", "a"),
-      term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
+      term("select", "a", "$SUM0(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2")
     )
 
     util.verifySql(sqlQuery, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
index af46c5b..9f3d2b6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
 
 class GroupingSetsTest extends TableTestBase {
@@ -34,29 +34,29 @@ class GroupingSetsTest extends TableTestBase {
     val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " +
       "GROUP BY GROUPING SETS (b, c)"
 
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      binaryNode(
-        "DataSetUnion",
+    val aggregate = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
         unaryNode(
           "DataSetAggregate",
           batchTableNode(0),
           term("groupBy", "b"),
-          term("select", "b", "AVG(a) AS c")
+          term("select", "b", "AVG(a) AS a")
         ),
+        term("select", "b", "null AS c", "a", "1 AS g")
+      ),
+      unaryNode(
+        "DataSetCalc",
         unaryNode(
           "DataSetAggregate",
           batchTableNode(0),
           term("groupBy", "c"),
-          term("select", "c AS b", "AVG(a) AS c")
+          term("select", "c", "AVG(a) AS a")
         ),
-        term("union", "b", "c", "i$b", "i$c", "a")
+        term("select", "null AS b", "c", "a", "2 AS g")
       ),
-      term("select",
-        "CASE(i$b, null, b) AS b",
-        "CASE(i$c, null, c) AS c",
-        "a",
-        "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g") // GROUP_ID()
+      term("union", "b", "c", "a", "g")
     )
 
     util.verifySql(sqlQuery, aggregate)
@@ -75,70 +75,79 @@ class GroupingSetsTest extends TableTestBase {
       "GROUP BY CUBE (b, c)"
 
     val group1 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b, c"),
-      term("select", "b", "c",
-           "AVG(a) AS i$b")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "b", "c"),
+        term("select", "b", "c", "AVG(a) AS a")
+      ),
+      term("select", "b", "c", "a", "3 AS g", "1 AS gb", "1 AS gc",
+        "1 AS gib", "1 AS gic", "3 AS gid")
     )
 
     val group2 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b"),
-      term("select", "b",
-           "AVG(a) AS c")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "b"),
+        term("select", "b", "AVG(a) AS a")
+      ),
+      term("select", "b", "null AS c", "a", "1 AS g", "1 AS gb", "0 AS gc",
+        "1 AS gib", "0 AS gic", "2 AS gid")
     )
 
     val group3 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "c"),
-      term("select", "c AS b",
-           "AVG(a) AS c")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "c"),
+        term("select", "c", "AVG(a) AS a")
+      ),
+      term("select", "null AS b", "c", "a", "2 AS g", "0 AS gb", "1 AS gc",
+        "0 AS gib", "1 AS gic", "1 AS gid")
     )
 
     val group4 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("select",
-           "AVG(a) AS b")
-    )
-
-    val union1 = binaryNode(
-      "DataSetUnion",
-      group1, group2,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val union2 = binaryNode(
-      "DataSetUnion",
-      union1, group3,
-      term("union", "b", "c", "i$b", "i$c", "a")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        unaryNode(
+          "DataSetUnion",
+          unaryNode(
+            "DataSetValues",
+            batchTableNode(0),
+            tuples(List(null, null, null)),
+            term("values", "a", "b", "c")
+          ),
+          term("union", "a", "b", "c")
+        ),
+        term("select", "AVG(a) AS a")
+      ),
+      term("select", "null AS b", "null AS c", "a", "0 AS g", "0 AS gb", "0 AS gc",
+        "0 AS gib", "0 AS gic", "0 AS gid")
     )
 
-    val union3 = binaryNode(
+    val union = binaryNode(
       "DataSetUnion",
-      union2, group4,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      union3,
-      term("select",
-           "CASE(i$b, null, b) AS b",
-           "CASE(i$c, null, c) AS c",
-           "a",
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID()
-           "CASE(i$b, 1, 0) AS gb", // GROUPING(b)
-           "CASE(i$c, 1, 0) AS gc", // GROUPING(c)
-           "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b)
-           "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c)
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c)
+      binaryNode(
+        "DataSetUnion",
+        binaryNode(
+          "DataSetUnion",
+          group1,
+          group2,
+          term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
+        ),
+        group3,
+        term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
+      ),
+      group4,
+      term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
     )
 
-    util.verifySql(sqlQuery, aggregate)
+    util.verifySql(sqlQuery, union)
   }
 
   @Test
@@ -153,55 +162,61 @@ class GroupingSetsTest extends TableTestBase {
                    "GROUP BY ROLLUP (b, c)"
 
     val group1 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b, c"),
-      term("select", "b", "c",
-           "AVG(a) AS i$b")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "b", "c"),
+        term("select", "b", "c", "AVG(a) AS a")
+      ),
+      term("select", "b", "c", "a", "3 AS g", "1 AS gb", "1 AS gc",
+        "1 AS gib", "1 AS gic", "3 AS gid")
     )
 
     val group2 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("groupBy", "b"),
-      term("select", "b",
-           "AVG(a) AS c")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "b"),
+        term("select", "b", "AVG(a) AS a")
+      ),
+      term("select", "b", "null AS c", "a", "1 AS g", "1 AS gb", "0 AS gc",
+        "1 AS gib", "0 AS gic", "2 AS gid")
     )
 
     val group3 = unaryNode(
-      "DataSetAggregate",
-      batchTableNode(0),
-      term("select",
-           "AVG(a) AS b")
-    )
-
-    val union1 = binaryNode(
-      "DataSetUnion",
-      group1, group2,
-      term("union", "b", "c", "i$b", "i$c", "a")
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        unaryNode(
+          "DataSetUnion",
+          unaryNode(
+            "DataSetValues",
+            batchTableNode(0),
+            tuples(List(null, null, null)),
+            term("values", "a", "b", "c")
+          ),
+          term("union", "a", "b", "c")
+        ),
+        term("select", "AVG(a) AS a")
+      ),
+      term("select", "null AS b", "null AS c", "a", "0 AS g", "0 AS gb", "0 AS gc",
+        "0 AS gib", "0 AS gic", "0 AS gid")
     )
 
-    val union2 = binaryNode(
+    val union = binaryNode(
       "DataSetUnion",
-      union1, group3,
-      term("union", "b", "c", "i$b", "i$c", "a")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetCalc",
-      union2,
-      term("select",
-           "CASE(i$b, null, b) AS b",
-           "CASE(i$c, null, c) AS c",
-           "a",
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS g", // GROUP_ID()
-           "CASE(i$b, 1, 0) AS gb", // GROUPING(b)
-           "CASE(i$c, 1, 0) AS gc", // GROUPING(c)
-           "CASE(i$b, 1, 0) AS gib", // GROUPING_ID(b)
-           "CASE(i$c, 1, 0) AS gic", // GROUPING_ID(c)
-           "+(*(CASE(i$b, 1, 0), 2), CASE(i$c, 1, 0)) AS gid") // GROUPING_ID(b, c)
+      binaryNode(
+        "DataSetUnion",
+        group1,
+        group2,
+        term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
+      ),
+      group3,
+      term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
     )
 
-    util.verifySql(sqlQuery, aggregate)
+    util.verifySql(sqlQuery, union)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d830881/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 339ea52..f9920e7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -18,14 +18,16 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.Collections
+import java.util.{Collections, Properties}
 
 import com.google.common.collect.Lists
+import org.apache.calcite.config.{CalciteConnectionConfigImpl, CalciteConnectionProperty}
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.validate.SqlMonikerType
-import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.runtime.utils.CommonTestData
 import org.apache.flink.table.sources.CsvTableSource
@@ -49,11 +51,15 @@ class ExternalCatalogSchemaTest {
     ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
     externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
+    val prop = new Properties()
+    prop.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName, "false")
+    val calciteConnConfig = new CalciteConnectionConfigImpl(prop)
     calciteCatalogReader = new CalciteCatalogReader(
       CalciteSchema.from(rootSchemaPlus),
-      false,
       Collections.emptyList(),
-      typeFactory)
+      typeFactory,
+      calciteConnConfig
+    )
   }
 
   @Test


Mime
View raw message