flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [22/50] [abbrv] flink git commit: [FLINK-3463] implement calc translation
Date Fri, 18 Mar 2016 13:48:16 GMT
[FLINK-3463] implement calc translation

- remove FlinkFilter, FlinkProject and associated rules

- deactivate FilterReduceExpressionsRule and ProjectReduceExpressionsRule
  (covered by CalcReduceExpressions)

This closes #1696


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e3f0762
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e3f0762
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e3f0762

Branch: refs/heads/master
Commit: 3e3f07629e8d3f39090282bb279484d3dce85a30
Parents: 10d3a31
Author: vasia <vasia@apache.org>
Authored: Mon Feb 22 15:06:37 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:50 2016 +0100

----------------------------------------------------------------------
 .../plan/nodes/dataset/DataSetFlatMap.scala     |   2 +-
 .../table/plan/nodes/dataset/DataSetMap.scala   |  75 --------------
 .../table/plan/nodes/logical/FlinkFilter.scala  |  42 --------
 .../table/plan/nodes/logical/FlinkProject.scala |  45 --------
 .../api/table/plan/rules/FlinkRuleSets.scala    |   6 --
 .../plan/rules/dataset/DataSetCalcRule.scala    |  87 +++++++++++++++-
 .../plan/rules/dataset/DataSetFilterRule.scala  | 103 -------------------
 .../plan/rules/dataset/DataSetProjectRule.scala |  89 ----------------
 .../plan/rules/logical/FlinkFilterRule.scala    |  50 ---------
 .../plan/rules/logical/FlinkProjectRule.scala   |  51 ---------
 .../flink/api/scala/table/test/CalcITCase.scala | 101 ++++++++++++++++++
 11 files changed, 188 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
index 9744792..da49ec8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
@@ -62,7 +62,7 @@ class DataSetFlatMap(
 
   override def translateToPlan(config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
+    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
     val returnType = determineReturnType(
       getRowType,
       expectedType,

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
deleted file mode 100644
index f4f8afb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.plan.TypeConverter.determineReturnType
-
-/**
-  * Flink RelNode which matches along with MapOperator.
-  */
-class DataSetMap(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowType: RelDataType,
-    opName: String,
-    func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => MapFunction[Any,
Any])
-  extends SingleRel(cluster, traitSet, input)
-  with DataSetRel {
-
-  override def deriveRowType() = rowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetMap(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      rowType,
-      opName,
-      func
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("name", opName)
-  }
-
-  override def toString = opName
-
-  override def translateToPlan(
-      config: TableConfig,
-      expectedType: Option[TypeInformation[Any]])
-    : DataSet[Any] = {
-    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-    val mapFunc = func.apply(config, inputDataSet.getType, returnType)
-    inputDataSet.map(mapFunc)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
deleted file mode 100644
index 9f0bf30..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkFilter.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.Filter
-import org.apache.calcite.rex.RexNode
-
-class FlinkFilter(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    condition: RexNode)
-  extends Filter(cluster, traitSet, input, condition)
-  with FlinkRel {
-
-  override def copy(traitSet: RelTraitSet, input: RelNode, condition: RexNode): Filter =
{
-    new FlinkFilter(
-      cluster,
-      traitSet,
-      input,
-      condition
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
deleted file mode 100644
index 1d93036..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkProject.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.logical
-
-import java.util
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Project
-import org.apache.calcite.rex.RexNode
-
-class FlinkProject(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    projects: java.util.List[RexNode],
-    rowType: RelDataType)
-  extends Project(cluster, traitSet, input, projects, rowType)
-  with FlinkRel {
-
-  override def copy(
-      traitSet: RelTraitSet,
-      input: RelNode,
-      projects: util.List[RexNode],
-      rowType: RelDataType): Project = {
-    new FlinkProject(cluster, traitSet, input, projects, rowType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index ac52b48..32d9f0d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -76,9 +76,7 @@ object FlinkRuleSets {
 
     // simplify expressions rules
     ReduceExpressionsRule.CALC_INSTANCE,
-    ReduceExpressionsRule.FILTER_INSTANCE,
     ReduceExpressionsRule.JOIN_INSTANCE,
-    ReduceExpressionsRule.PROJECT_INSTANCE,
 
     // prune empty results rules
     PruneEmptyRules.AGGREGATE_INSTANCE,
@@ -99,9 +97,7 @@ object FlinkRuleSets {
     // translate to logical Flink nodes
     FlinkAggregateRule.INSTANCE,
     FlinkCalcRule.INSTANCE,
-    FlinkFilterRule.INSTANCE,
     FlinkJoinRule.INSTANCE,
-    FlinkProjectRule.INSTANCE,
     FlinkScanRule.INSTANCE,
     FlinkUnionRule.INSTANCE
   )
@@ -111,9 +107,7 @@ object FlinkRuleSets {
     // translate to DataSet nodes
     DataSetAggregateRule.INSTANCE,
     DataSetCalcRule.INSTANCE,
-    DataSetFilterRule.INSTANCE,
     DataSetJoinRule.INSTANCE,
-    DataSetProjectRule.INSTANCE,
     DataSetScanRule.INSTANCE,
     DataSetUnionRule.INSTANCE
   )

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
index 85e090c..256a085 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala
@@ -23,6 +23,14 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetFlatMap}
 import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, FlinkConvention}
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.common.functions.FlatMapFunction
+import scala.collection.JavaConversions._
+import org.apache.calcite.rex.RexLocalRef
+import org.apache.flink.api.table.codegen.GeneratedExpression
 
 class DataSetCalcRule
   extends ConverterRule(
@@ -37,13 +45,90 @@ class DataSetCalcRule
     val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
     val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
 
+    val calcFunc = (
+        config: TableConfig,
+        inputType: TypeInformation[Any],
+        returnType: TypeInformation[Any]) => {
+      val generator = new CodeGenerator(config, inputType)
+
+      val calcProgram = calc.getProgram
+      val condition = calcProgram.getCondition
+      val expandedExpressions = calcProgram.getProjectList.map(
+          expr => calcProgram.expandLocalRef(expr.asInstanceOf[RexLocalRef]))
+      val projection = generator.generateResultExpression(
+        returnType,
+        calc.getRowType.getFieldNames,
+        expandedExpressions)
+      
+      val body = {
+        // only projection
+        if (condition == null) {
+          s"""
+            |${projection.code}
+            |${generator.collectorTerm}.collect(${projection.resultTerm});
+            |""".stripMargin
+        }
+        else {
+          val filterCondition = generator.generateExpression(
+              calcProgram.expandLocalRef(calcProgram.getCondition))
+          // only filter
+          if (projection == null) {
+            // conversion
+            if (inputType != returnType) {
+              val conversion = generator.generateConverterResultExpression(
+                  returnType,
+                  calc.getRowType.getFieldNames)
+
+                  s"""
+                    |${filterCondition.code}
+                    |if (${filterCondition.resultTerm}) {
+                    |  ${conversion.code}
+                    |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+                    |}
+                    |""".stripMargin
+            }
+            // no conversion
+            else {
+              s"""
+                |${filterCondition.code}
+                |if (${filterCondition.resultTerm}) {
+                |  ${generator.collectorTerm}.collect(${generator.input1Term});
+                |}
+                |""".stripMargin
+            }
+          }
+          // both filter and projection
+          else {
+            s"""
+              |${filterCondition.code}
+              |if (${filterCondition.resultTerm}) {
+              |  ${projection.code}
+              |  ${generator.collectorTerm}.collect(${projection.resultTerm});
+              |}
+              |""".stripMargin
+          }
+        }
+      }
+
+      val genFunction = generator.generateFunction(
+        description,
+        classOf[FlatMapFunction[Any, Any]],
+        body,
+        returnType)
+
+      new FlatMapRunner[Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType)
+    }
+
     new DataSetFlatMap(
       rel.getCluster,
       traitSet,
       convInput,
       rel.getRowType,
       calc.toString,
-      null)
+      calcFunc)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
deleted file mode 100644
index bec4d1f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetFlatMap}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkFilter}
-import org.apache.flink.api.table.runtime.FlatMapRunner
-
-import scala.collection.JavaConversions._
-
-class DataSetFilterRule
-  extends ConverterRule(
-    classOf[FlinkFilter],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetFilterRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val filter: FlinkFilter = rel.asInstanceOf[FlinkFilter]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(filter.getInput, DataSetConvention.INSTANCE)
-
-    val func = (
-        config: TableConfig,
-        inputType: TypeInformation[Any],
-        returnType: TypeInformation[Any]) => {
-      val generator = new CodeGenerator(config, inputType)
-
-      val condition = generator.generateExpression(filter.getCondition)
-
-      // conversion
-      val body = if (inputType != returnType) {
-        val conversion = generator.generateConverterResultExpression(
-          returnType,
-          filter.getRowType.getFieldNames)
-
-        s"""
-          |${condition.code}
-          |if (${condition.resultTerm}) {
-          |  ${conversion.code}
-          |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-          |}
-          |""".stripMargin
-      }
-      // no conversion
-      else {
-        s"""
-          |${condition.code}
-          |if (${condition.resultTerm}) {
-          |  ${generator.collectorTerm}.collect(${generator.input1Term});
-          |}
-          |""".stripMargin
-      }
-
-      val genFunction = generator.generateFunction(
-        description,
-        classOf[FlatMapFunction[Any, Any]],
-        body,
-        returnType)
-
-      new FlatMapRunner[Any, Any](
-        genFunction.name,
-        genFunction.code,
-        genFunction.returnType)
-    }
-
-    new DataSetFlatMap(
-      rel.getCluster,
-      traitSet,
-      convInput,
-      rel.getRowType,
-      filter.toString,
-      func)
-  }
-}
-
-object DataSetFilterRule {
-  val INSTANCE: RelOptRule = new DataSetFilterRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
deleted file mode 100644
index 5b1c763..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataset
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMap}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkProject}
-import org.apache.flink.api.table.runtime.MapRunner
-
-import scala.collection.JavaConversions._
-
-class DataSetProjectRule
-  extends ConverterRule(
-    classOf[FlinkProject],
-    FlinkConvention.INSTANCE,
-    DataSetConvention.INSTANCE,
-    "DataSetProjectRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-    val proj: FlinkProject = rel.asInstanceOf[FlinkProject]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(proj.getInput, DataSetConvention.INSTANCE)
-
-    val func = (
-        config: TableConfig,
-        inputType: TypeInformation[Any],
-        returnType: TypeInformation[Any]) => {
-      val generator = new CodeGenerator(config, inputType)
-
-      // projection and implicit type conversion
-      val projection = generator.generateResultExpression(
-        returnType,
-        proj.getRowType.getFieldNames,
-        proj.getProjects)
-
-      val body =
-        s"""
-          |${projection.code}
-          |return ${projection.resultTerm};
-          |""".stripMargin
-
-      val genFunction = generator.generateFunction(
-        description,
-        classOf[MapFunction[Any, Any]],
-        body,
-        returnType)
-
-      new MapRunner[Any, Any](
-        genFunction.name,
-        genFunction.code,
-        genFunction.returnType)
-    }
-
-    new DataSetMap(
-      rel.getCluster,
-      traitSet,
-      convInput,
-      rel.getRowType,
-      proj.toString,
-      func)
-  }
-}
-
-object DataSetProjectRule {
-  val INSTANCE: RelOptRule = new DataSetProjectRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
deleted file mode 100644
index 25df8e8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkFilterRule.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalFilter
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, FlinkConvention}
-
-class FlinkFilterRule
-  extends ConverterRule(
-      classOf[LogicalFilter],
-      Convention.NONE,
-      FlinkConvention.INSTANCE,
-      "FlinkFilterRule")
-  {
-
-    def convert(rel: RelNode): RelNode = {
-      val filter: LogicalFilter = rel.asInstanceOf[LogicalFilter]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(filter.getInput, FlinkConvention.INSTANCE)
-
-      new FlinkFilter(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        filter.getCondition)
-    }
-  }
-
-object FlinkFilterRule {
-  val INSTANCE: RelOptRule = new FlinkFilterRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
deleted file mode 100644
index d0e1410..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkProjectRule.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.logical
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, FlinkConvention}
-
-class FlinkProjectRule
-  extends ConverterRule(
-      classOf[LogicalProject],
-      Convention.NONE,
-      FlinkConvention.INSTANCE,
-      "FlinkProjectRule")
-  {
-
-    def convert(rel: RelNode): RelNode = {
-      val proj: LogicalProject = rel.asInstanceOf[LogicalProject]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(proj.getInput, FlinkConvention.INSTANCE)
-
-      new FlinkProject(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        proj.getProjects,
-        proj.getRowType)
-    }
-  }
-
-object FlinkProjectRule {
-  val INSTANCE: RelOptRule = new FlinkProjectRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e3f0762/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
new file mode 100644
index 0000000..ebbecb8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CalcITCase.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.Row
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import scala.collection.JavaConverters._
+import org.apache.flink.api.table.test.TableProgramsTestBase
+import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+
+@RunWith(classOf[Parameterized])
+class CalcITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSimpleCalc(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+        .select('_1, '_2, '_3)
+        .where('_1 < 7)
+        .select('_1, '_3)
+
+    val expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
+      "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n"
+      val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testCalcWithTwoFilters(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+        .select('_1, '_2, '_3)
+        .where('_1 < 7 && '_2 === 3)
+        .select('_1, '_3)
+        .where('_1 === 4)
+        .select('_1)
+
+    val expected = "4\n"
+    val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def TestCalcWithAggregation(): Unit = {
+    
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable
+        .select('_1, '_2, '_3)
+        .where('_1 < 15)
+        .groupBy('_2)
+        .select('_1.min, '_2.count as 'cnt)
+        .where('cnt > 3)
+
+    val expected = "7,4\n" + "11,4\n"
+    val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testCalcJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
+      .where('b > 1).select('a, 'd).where('d === 2)
+
+    val expected = "2,2\n" + "3,2\n"
+    val results = joinT.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}


Mime
View raw message