flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/3] flink git commit: [FLINK-3596] Finalized DataSet RelNode refactoring
Date Thu, 10 Mar 2016 23:08:26 GMT
[FLINK-3596] Finalized DataSet RelNode refactoring


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d720b002
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d720b002
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d720b002

Branch: refs/heads/tableOnCalcite
Commit: d720b002a77d4ac3a2e41bfaeaf6ad87c2e415c0
Parents: 22621e0
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Mar 10 23:30:06 2016 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Mar 10 23:30:43 2016 +0100

----------------------------------------------------------------------
 .../plan/nodes/dataset/DataSetAggregate.scala   |   7 +-
 .../table/plan/nodes/dataset/DataSetUnion.scala |   2 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |  12 +-
 .../rules/dataSet/DataSetAggregateRule.scala    |  55 +++++++
 .../plan/rules/dataSet/DataSetCalcRule.scala    |  53 ++++++
 .../plan/rules/dataSet/DataSetJoinRule.scala    | 115 +++++++++++++
 .../plan/rules/dataSet/DataSetScanRule.scala    |  50 ++++++
 .../plan/rules/dataSet/DataSetUnionRule.scala   |  54 ++++++
 .../dataSet/FlinkJoinUnionTransposeRule.scala   | 110 +++++++++++++
 .../plan/rules/logical/FlinkAggregateRule.scala |  55 -------
 .../plan/rules/logical/FlinkCalcRule.scala      |  53 ------
 .../plan/rules/logical/FlinkJoinRule.scala      | 115 -------------
 .../logical/FlinkJoinUnionTransposeRule.scala   | 110 -------------
 .../plan/rules/logical/FlinkScanRule.scala      |  52 ------
 .../plan/rules/logical/FlinkUnionRule.scala     |  54 ------
 .../table/runtime/aggregate/AggregateUtil.scala |  11 +-
 .../test/GroupedAggregationsITCase.scala.orig   | 164 -------------------
 17 files changed, 450 insertions(+), 622 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 3856c5f..9a9bf99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -24,7 +24,6 @@ import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.TypeConverter._
 import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
@@ -76,7 +75,7 @@ class DataSetAggregate(
       case _ => // ok
     }
 
-    val groupingKeys = (0 until grouping.length).toArray
+    val groupingKeys = grouping.indices.toArray
     // add grouping fields, position keys in the input, and input type
     val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
       inputType, rowType, grouping, config)
@@ -93,8 +92,8 @@ class DataSetAggregate(
     .toArray
 
     val rowTypeInfo = new RowTypeInfo(fieldTypes)
-    val mappedInput = inputDS.map(aggregateResult.mapFunc)
-    val groupReduceFunction = aggregateResult.reduceGroupFunc
+    val mappedInput = inputDS.map(aggregateResult._1)
+    val groupReduceFunction = aggregateResult._2
 
     if (groupingKeys.length > 0) {
       mappedInput.asInstanceOf[DataSet[Row]]

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index 462c4a5..a52a65e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{TableConfig, Row}
+import org.apache.flink.api.table.TableConfig
 
 /**
 * Flink RelNode which matches along with UnionOperator.

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index bd128b2..d8cc2b3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.plan.rules
 
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSets, RuleSet}
-import org.apache.flink.api.table.plan.rules.logical._
+import org.apache.flink.api.table.plan.rules.dataSet._
 
 object FlinkRuleSets {
 
@@ -94,11 +94,11 @@ object FlinkRuleSets {
     CalcMergeRule.INSTANCE,
 
     // translate to logical Flink nodes
-    FlinkAggregateRule.INSTANCE,
-    FlinkCalcRule.INSTANCE,
-    FlinkJoinRule.INSTANCE,
-    FlinkScanRule.INSTANCE,
-    FlinkUnionRule.INSTANCE
+    DataSetAggregateRule.INSTANCE,
+    DataSetCalcRule.INSTANCE,
+    DataSetJoinRule.INSTANCE,
+    DataSetScanRule.INSTANCE,
+    DataSetUnionRule.INSTANCE
   )
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
new file mode 100644
index 0000000..40afd4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+import scala.collection.JavaConversions._
+
+class DataSetAggregateRule
+  extends ConverterRule(
+      classOf[LogicalAggregate],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "FlinkAggregateRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
+
+      new DataSetAggregate(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        agg.getNamedAggCalls,
+        rel.getRowType,
+        agg.getInput.getRowType,
+        agg.toString,
+        agg.getGroupSet.toArray)
+      }
+  }
+
+object DataSetAggregateRule {
+  val INSTANCE: RelOptRule = new DataSetAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
new file mode 100644
index 0000000..3610819
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
+
+class DataSetCalcRule
+  extends ConverterRule(
+      classOf[LogicalCalc],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "FlinkCalcRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
+
+      new DataSetCalc(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        rel.getRowType,
+        calc.getProgram,
+        calc.toString,
+        description)
+    }
+  }
+
+object DataSetCalcRule {
+  val INSTANCE: RelOptRule = new DataSetCalcRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
new file mode 100644
index 0000000..06f7d51
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.{RexInputRef, RexCall}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+class DataSetJoinRule
+  extends ConverterRule(
+      classOf[LogicalJoin],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "FlinkJoinRule")
+  {
+
+    override def matches(call: RelOptRuleCall): Boolean = {
+
+      val join = call.rel(0).asInstanceOf[LogicalJoin]
+      val children = join.getInputs
+      val rexBuilder = call.builder().getRexBuilder
+
+      val joinInfo = join.analyzeCondition()
+      val joinCondition = join.getCondition
+      val equiCondition =
+        joinInfo.getEquiCondition(children.get(0), children.get(1), rexBuilder)
+
+      // joins require at least one equi-condition
+      if (equiCondition.isAlwaysTrue) {
+        false
+      }
+      else {
+        // check that all equality predicates refer to field refs only (not computed expressions)
+        //   Note: Calcite treats equality predicates on expressions as non-equi predicates
+        joinCondition match {
+
+          // conjunction of join predicates
+          case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.AND) =>
+
+            c.getOperands.asScala
+              // look at equality predicates only
+              .filter { o =>
+                o.isInstanceOf[RexCall] &&
+                o.asInstanceOf[RexCall].getOperator.equals(SqlStdOperatorTable.EQUALS)
+              }
+              // check that both children are field references
+              .map { o =>
+                o.asInstanceOf[RexCall].getOperands.get(0).isInstanceOf[RexInputRef] &&
+                o.asInstanceOf[RexCall].getOperands.get(1).isInstanceOf[RexInputRef]
+              }
+              // any equality predicate that does not refer to a field reference?
+              .reduce( (a, b) => a && b)
+
+          // single equi-join predicate
+          case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.EQUALS) =>
+            c.getOperands.get(0).isInstanceOf[RexInputRef] &&
+              c.getOperands.get(1).isInstanceOf[RexInputRef]
+          case _ =>
+            false
+        }
+      }
+
+    }
+
+    def convert(rel: RelNode): RelNode = {
+
+      val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
+      val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
+      val joinInfo = join.analyzeCondition
+
+        new DataSetJoin(
+          rel.getCluster,
+          traitSet,
+          convLeft,
+          convRight,
+          rel.getRowType,
+          join.toString,
+          join.getCondition,
+          join.getRowType,
+          joinInfo,
+          joinInfo.pairs.toList,
+          JoinType.INNER,
+          null,
+          description)
+    }
+  }
+
+object DataSetJoinRule {
+  val INSTANCE: RelOptRule = new DataSetJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
new file mode 100644
index 0000000..2865d9f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource}
+
+class DataSetScanRule
+  extends ConverterRule(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "FlinkScanRule")
+  {
+    def convert(rel: RelNode): RelNode = {
+      val scan: TableScan = rel.asInstanceOf[TableScan]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+      new DataSetSource(
+        rel.getCluster,
+        traitSet,
+        scan.getTable,
+        rel.getRowType
+      )
+    }
+  }
+
+object DataSetScanRule {
+  val INSTANCE: RelOptRule = new DataSetScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
new file mode 100644
index 0000000..6ab64c6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
+
+class DataSetUnionRule
+  extends ConverterRule(
+      classOf[LogicalUnion],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "FlinkUnionRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+
+      val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
+      val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
+
+      new DataSetUnion(
+        rel.getCluster,
+        traitSet,
+        convLeft,
+        convRight,
+        rel.getRowType,
+        union.toString)
+    }
+  }
+
+object DataSetUnionRule {
+  val INSTANCE: RelOptRule = new DataSetUnionRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala
new file mode 100644
index 0000000..32b53e2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala
@@ -0,0 +1,110 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule.{any, operand, convert => convertTrait}
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.RelOptRuleOperand
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.RelNode
+import java.util.ArrayList
+import scala.collection.JavaConversions._
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.calcite.rel.core.Join
+import org.apache.calcite.rel.core.Union
+
+/**
+ * This rule is a copy of Calcite's JoinUnionTransposeRule.
+ * Calcite's implementation checks whether one of the operands is a LogicalUnion,
+ * which fails in our case, when it matches with a DataSetUnion.
+ * This rule changes this check to match Union, instead of LogicalUnion only.
+ * The rest of the rule's logic has not been changed.
+ */
+class FlinkJoinUnionTransposeRule(
+    operand: RelOptRuleOperand,
+    description: String) extends RelOptRule(operand, description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join = call.rel(0).asInstanceOf[Join]
+    val (unionRel: Union, otherInput: RelNode, unionOnLeft: Boolean) = {
+      if (call.rel(1).isInstanceOf[Union]) {
+        (call.rel(1).asInstanceOf[Union], call.rel(2).asInstanceOf[RelNode], true)
+      }
+      else {
+        (call.rel(2).asInstanceOf[Union], call.rel(1).asInstanceOf[RelNode], false)
+      }
+    }
+    
+    if (!unionRel.all) {
+      return
+    }
+    if (!join.getVariablesStopped.isEmpty) {
+      return
+    }
+    // The UNION ALL cannot be on the null generating side
+    // of an outer join (otherwise we might generate incorrect
+    // rows for the other side for join keys which lack a match
+    // in one or both branches of the union)
+    if (unionOnLeft) {
+      if (join.getJoinType.generatesNullsOnLeft) {
+        return
+      }
+    }
+    else {
+      if (join.getJoinType.generatesNullsOnRight) {
+        return
+      }
+    }
+    val newUnionInputs = new ArrayList[RelNode]
+    for (input <- unionRel.getInputs) {
+      val (joinLeft: RelNode, joinRight: RelNode) = {
+      if (unionOnLeft) {
+        (input, otherInput)
+      }
+      else {
+        (otherInput, input)
+      }
+    }
+
+      newUnionInputs.add(
+          join.copy(
+              join.getTraitSet,
+              join.getCondition,
+              joinLeft,
+              joinRight,
+              join.getJoinType,
+              join.isSemiJoinDone))
+    }
+    val newUnionRel = unionRel.copy(unionRel.getTraitSet, newUnionInputs, true)
+    call.transformTo(newUnionRel)
+  }
+}
+
+object FlinkJoinUnionTransposeRule {
+  val LEFT_UNION = new FlinkJoinUnionTransposeRule(
+      operand(classOf[LogicalJoin], operand(classOf[LogicalUnion], any),
+          operand(classOf[RelNode], any)),
+          "JoinUnionTransposeRule(Union-Other)")
+
+    val RIGHT_UNION = new FlinkJoinUnionTransposeRule(
+      operand(classOf[LogicalJoin], operand(classOf[RelNode], any),
+          operand(classOf[LogicalUnion], any)),
+          "JoinUnionTransposeRule(Other-Union)")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
deleted file mode 100644
index 01a5130..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
-import scala.collection.JavaConversions._
-
-class FlinkAggregateRule
-  extends ConverterRule(
-      classOf[LogicalAggregate],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "FlinkAggregateRule")
-  {
-
-    def convert(rel: RelNode): RelNode = {
-      val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
-
-      new DataSetAggregate(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        agg.getNamedAggCalls,
-        rel.getRowType,
-        agg.getInput.getRowType,
-        agg.toString,
-        agg.getGroupSet.toArray)
-      }
-  }
-
-object FlinkAggregateRule {
-  val INSTANCE: RelOptRule = new FlinkAggregateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
deleted file mode 100644
index f5e9c68..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
-
-class FlinkCalcRule
-  extends ConverterRule(
-      classOf[LogicalCalc],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "FlinkCalcRule")
-  {
-
-    def convert(rel: RelNode): RelNode = {
-      val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
-
-      new DataSetCalc(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        rel.getRowType,
-        calc.getProgram,
-        calc.toString,
-        description)
-    }
-  }
-
-object FlinkCalcRule {
-  val INSTANCE: RelOptRule = new FlinkCalcRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
deleted file mode 100644
index c8ce944..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rex.{RexInputRef, RexCall}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-
-class FlinkJoinRule
-  extends ConverterRule(
-      classOf[LogicalJoin],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "FlinkJoinRule")
-  {
-
-    override def matches(call: RelOptRuleCall): Boolean = {
-
-      val join = call.rel(0).asInstanceOf[LogicalJoin]
-      val children = join.getInputs
-      val rexBuilder = call.builder().getRexBuilder
-
-      val joinInfo = join.analyzeCondition()
-      val joinCondition = join.getCondition
-      val equiCondition =
-        joinInfo.getEquiCondition(children.get(0), children.get(1), rexBuilder)
-
-      // joins require at least one equi-condition
-      if (equiCondition.isAlwaysTrue) {
-        false
-      }
-      else {
-        // check that all equality predicates refer to field refs only (not computed expressions)
-        //   Note: Calcite treats equality predicates on expressions as non-equi predicates
-        joinCondition match {
-
-          // conjunction of join predicates
-          case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.AND) =>
-
-            c.getOperands.asScala
-              // look at equality predicates only
-              .filter { o =>
-                o.isInstanceOf[RexCall] &&
-                o.asInstanceOf[RexCall].getOperator.equals(SqlStdOperatorTable.EQUALS)
-              }
-              // check that both children are field references
-              .map { o =>
-                o.asInstanceOf[RexCall].getOperands.get(0).isInstanceOf[RexInputRef] &&
-                o.asInstanceOf[RexCall].getOperands.get(1).isInstanceOf[RexInputRef]
-              }
-              // any equality predicate that does not refer to a field reference?
-              .reduce( (a, b) => a && b)
-
-          // single equi-join predicate
-          case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.EQUALS) =>
-            c.getOperands.get(0).isInstanceOf[RexInputRef] &&
-              c.getOperands.get(1).isInstanceOf[RexInputRef]
-          case _ =>
-            false
-        }
-      }
-
-    }
-
-    def convert(rel: RelNode): RelNode = {
-
-      val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
-      val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
-      val joinInfo = join.analyzeCondition
-
-        new DataSetJoin(
-          rel.getCluster,
-          traitSet,
-          convLeft,
-          convRight,
-          rel.getRowType,
-          join.toString,
-          join.getCondition,
-          join.getRowType,
-          joinInfo,
-          joinInfo.pairs.toList,
-          JoinType.INNER,
-          null,
-          description)
-    }
-  }
-
-object FlinkJoinRule {
-  val INSTANCE: RelOptRule = new FlinkJoinRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
deleted file mode 100644
index ff3ee8f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.RelOptRule.{any, operand, convert => convertTrait}
-import org.apache.calcite.plan.RelOptRule
-import org.apache.calcite.plan.RelOptRuleOperand
-import org.apache.calcite.plan.RelOptRuleCall
-import org.apache.calcite.rel.RelNode
-import java.util.ArrayList
-import scala.collection.JavaConversions._
-import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.calcite.rel.core.Join
-import org.apache.calcite.rel.core.Union
-
-/**
- * This rule is a copy of Calcite's JoinUnionTransposeRule.
- * Calcite's implementation checks whether one of the operands is a LogicalUnion,
- * which fails in our case, when it matches with a DataSetUnion.
- * This rule changes this check to match Union, instead of LogicalUnion only.
- * The rest of the rule's logic has not been changed.
- */
-class FlinkJoinUnionTransposeRule(
-    operand: RelOptRuleOperand,
-    description: String) extends RelOptRule(operand, description) {
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val join = call.rel(0).asInstanceOf[Join]
-    val (unionRel: Union, otherInput: RelNode, unionOnLeft: Boolean) = {
-      if (call.rel(1).isInstanceOf[Union]) {
-        (call.rel(1).asInstanceOf[Union], call.rel(2).asInstanceOf[RelNode], true)
-      }
-      else {
-        (call.rel(2).asInstanceOf[Union], call.rel(1).asInstanceOf[RelNode], false)
-      }
-    }
-    
-    if (!unionRel.all) {
-      return
-    }
-    if (!join.getVariablesStopped.isEmpty) {
-      return
-    }
-    // The UNION ALL cannot be on the null generating side
-    // of an outer join (otherwise we might generate incorrect
-    // rows for the other side for join keys which lack a match
-    // in one or both branches of the union)
-    if (unionOnLeft) {
-      if (join.getJoinType.generatesNullsOnLeft) {
-        return
-      }
-    }
-    else {
-      if (join.getJoinType.generatesNullsOnRight) {
-        return
-      }
-    }
-    val newUnionInputs = new ArrayList[RelNode]
-    for (input <- unionRel.getInputs) {
-      val (joinLeft: RelNode, joinRight: RelNode) = {
-      if (unionOnLeft) {
-        (input, otherInput)
-      }
-      else {
-        (otherInput, input)
-      }
-    }
-
-      newUnionInputs.add(
-          join.copy(
-              join.getTraitSet,
-              join.getCondition,
-              joinLeft,
-              joinRight,
-              join.getJoinType,
-              join.isSemiJoinDone))
-    }
-    val newUnionRel = unionRel.copy(unionRel.getTraitSet, newUnionInputs, true)
-    call.transformTo(newUnionRel)
-  }
-}
-
-object FlinkJoinUnionTransposeRule {
-  val LEFT_UNION = new FlinkJoinUnionTransposeRule(
-      operand(classOf[LogicalJoin], operand(classOf[LogicalUnion], any),
-          operand(classOf[RelNode], any)),
-          "JoinUnionTransposeRule(Union-Other)")
-
-    val RIGHT_UNION = new FlinkJoinUnionTransposeRule(
-      operand(classOf[LogicalJoin], operand(classOf[RelNode], any),
-          operand(classOf[LogicalUnion], any)),
-          "JoinUnionTransposeRule(Other-Union)")
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
deleted file mode 100644
index 21da504..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource}
-import org.apache.flink.api.table.plan.schema.DataSetTable
-
-class FlinkScanRule
-  extends ConverterRule(
-      classOf[LogicalTableScan],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "FlinkScanRule")
-  {
-    def convert(rel: RelNode): RelNode = {
-      val scan: TableScan = rel.asInstanceOf[TableScan]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
-      new DataSetSource(
-        rel.getCluster,
-        traitSet,
-        scan.getTable,
-        rel.getRowType
-      )
-    }
-  }
-
-object FlinkScanRule {
-  val INSTANCE: RelOptRule = new FlinkScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
deleted file mode 100644
index 11600a2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalUnion
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}
-
-class FlinkUnionRule
-  extends ConverterRule(
-      classOf[LogicalUnion],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "FlinkUnionRule")
-  {
-
-    def convert(rel: RelNode): RelNode = {
-
-      val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE)
-      val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE)
-
-      new DataSetUnion(
-        rel.getCluster,
-        traitSet,
-        convLeft,
-        convRight,
-        rel.getRowType,
-        union.toString)
-    }
-  }
-
-object FlinkUnionRule {
-  val INSTANCE: RelOptRule = new FlinkUnionRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 11857df..70d0497 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -26,7 +26,6 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
 import org.apache.calcite.sql.fun._
 import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.plan.{TypeConverter, PlanGenException}
 import org.apache.flink.api.table.plan.TypeConverter._
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
@@ -66,7 +65,7 @@ object AggregateUtil {
   def createOperatorFunctionsForAggregates(namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType, outputType: RelDataType,
       groupings: Array[Int],
-      config: TableConfig): AggregateResult = {
+      config: TableConfig): (MapFunction[Any, Row], GroupReduceFunction[Row, Row] ) = {
 
     val aggregateFunctionsAndFieldIndexes =
       transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length)
@@ -85,7 +84,7 @@ object AggregateUtil {
 
     val mapFunction = new AggregateMapFunction[Row, Row](
         aggregates, aggFieldIndexes, groupings,
-        mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Any]]
+        mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
     // the mapping relation between field index of intermediate aggregate Row and output Row.
     val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
@@ -114,7 +113,7 @@ object AggregateUtil {
           aggOffsetMapping, intermediateRowArity)
       }
 
-    new AggregateResult(mapFunction, reduceGroupFunction)
+    (mapFunction, reduceGroupFunction)
   }
 
   private def transformToAggregateFunctions(
@@ -317,7 +316,3 @@ object AggregateUtil {
   }
 }
 
-case class AggregateResult(
-    val mapFunc: MapFunction[Any, Any],
-    val reduceGroupFunc: GroupReduceFunction[Row, Row]) {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig
deleted file mode 100644
index ad6a641..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table.test
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .as('a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithLongKeys(): Unit = {
-    // This uses very long keys to force serialized comparison.
-    // With short keys, the normalized key is sufficient.
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
-      .rebalance().setParallelism(2).as('a, 'b, 'c)
-      .groupBy('a, 'b)
-      .select('c.sum)
-
-    val expected = "10\n" + "8\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedByExpression(): Unit = {
-
-    // verify AggregateProjectPullUpConstantsRule
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
-      "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
-      "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
-    val results = t.toDataSet[Row].collect()
-
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedByExpression2(): Unit = {
-
-    // verify AggregateProjectPullUpConstantsRule
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .select('b, 4 as 'four, 'a)
-      .groupBy('b, 'four)
-      .select('four, 'a.sum)
-
-    val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
-    val results = t.toDataSet[Row].collect()
-
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}


Mime
View raw message