flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [3/3] flink git commit: [FLINK-3503] [tableAPI] Add cost model for DataSet RelNodes to improve plan selection.
Date Wed, 16 Mar 2016 14:35:06 GMT
[FLINK-3503] [tableAPI] Add cost model for DataSet RelNodes to improve plan selection.

This closes #1798


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dfed342
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dfed342
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dfed342

Branch: refs/heads/tableOnCalcite
Commit: 9dfed3427d930655e2c9e9561acd2758b7798a0b
Parents: 1e66699
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Mar 14 19:57:53 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Wed Mar 16 15:34:42 2016 +0100

----------------------------------------------------------------------
 .../api/table/plan/TranslationContext.scala     |   2 +
 .../flink/api/table/plan/cost/DataSetCost.scala | 150 +++++++++++++++++++
 .../table/plan/cost/DataSetCostFactory.scala    |  47 ++++++
 .../plan/nodes/dataset/DataSetAggregate.scala   |  12 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  23 ++-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  14 +-
 .../table/plan/nodes/dataset/DataSetRel.scala   |  21 +++
 .../plan/nodes/dataset/DataSetSource.scala      |   7 +
 .../table/plan/nodes/dataset/DataSetUnion.scala |  14 +-
 .../api/table/plan/schema/DataSetTable.scala    |   6 +-
 10 files changed, 289 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index 31541ad..9acc7ba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+import org.apache.flink.api.table.plan.cost.DataSetCostFactory
 import org.apache.flink.api.table.plan.schema.DataSetTable
 
 object TranslationContext {
@@ -54,6 +55,7 @@ object TranslationContext {
       .newConfigBuilder
       .defaultSchema(tables)
       .parserConfig(parserConfig)
+      .costFactory(new DataSetCostFactory)
       .traitDefs(ConventionTraitDef.INSTANCE)
       .build
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
new file mode 100644
index 0000000..58537dd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.cost
+
+import org.apache.calcite.plan.{RelOptUtil, RelOptCostFactory, RelOptCost}
+import org.apache.calcite.util.Util
+
+/**
+  * This class is based on Apache Calcite's `org.apache.calcite.plan.volcano.VolcanoCost`
and has
+  * an adapted cost comparison method `isLe(other: RelOptCost)` that takes io and cpu into
account.
+  */
+class DataSetCost(val rowCount: Double, val cpu: Double, val io: Double) extends RelOptCost
{
+
+  def getCpu: Double = cpu
+
+  def isInfinite: Boolean = {
+    (this eq DataSetCost.Infinity) ||
+      (this.rowCount == Double.PositiveInfinity) ||
+      (this.cpu == Double.PositiveInfinity) ||
+      (this.io == Double.PositiveInfinity)
+  }
+
+  def getIo: Double = io
+
+  def isLe(other: RelOptCost): Boolean = {
+    val that: DataSetCost = other.asInstanceOf[DataSetCost]
+    (this eq that) ||
+      (this.io < that.io) ||
+      (this.io == that.io && this.cpu < that.cpu) ||
+      (this.io == that.io && this.cpu == that.cpu && this.rowCount < that.rowCount)
+  }
+
+  def isLt(other: RelOptCost): Boolean = {
+    isLe(other) && !(this == other)
+  }
+
+  def getRows: Double = rowCount
+
+  override def hashCode: Int = Util.hashCode(rowCount) + Util.hashCode(cpu) + Util.hashCode(io)
+
+  def equals(other: RelOptCost): Boolean = {
+    (this eq other) ||
+      other.isInstanceOf[DataSetCost] &&
+        (this.rowCount == other.asInstanceOf[DataSetCost].rowCount) &&
+        (this.cpu == other.asInstanceOf[DataSetCost].cpu) &&
+        (this.io == other.asInstanceOf[DataSetCost].io)
+  }
+
+  def isEqWithEpsilon(other: RelOptCost): Boolean = {
+    if (!other.isInstanceOf[DataSetCost]) {
+      return false
+    }
+    val that: DataSetCost = other.asInstanceOf[DataSetCost]
+    (this eq that) ||
+      ((Math.abs(this.rowCount - that.rowCount) < RelOptUtil.EPSILON) &&
+        (Math.abs(this.cpu - that.cpu) < RelOptUtil.EPSILON) &&
+        (Math.abs(this.io - that.io) < RelOptUtil.EPSILON))
+  }
+
+  def minus(other: RelOptCost): RelOptCost = {
+    if (this eq DataSetCost.Infinity) {
+      return this
+    }
+    val that: DataSetCost = other.asInstanceOf[DataSetCost]
+    new DataSetCost(this.rowCount - that.rowCount, this.cpu - that.cpu, this.io - that.io)
+  }
+
+  def multiplyBy(factor: Double): RelOptCost = {
+    if (this eq DataSetCost.Infinity) {
+      return this
+    }
+    new DataSetCost(rowCount * factor, cpu * factor, io * factor)
+  }
+
+  def divideBy(cost: RelOptCost): Double = {
+    val that: DataSetCost = cost.asInstanceOf[DataSetCost]
+    var d: Double = 1
+    var n: Double = 0
+    if ((this.rowCount != 0) && !this.rowCount.isInfinite &&
+      (that.rowCount != 0) && !that.rowCount.isInfinite)
+    {
+      d *= this.rowCount / that.rowCount
+      n += 1
+    }
+    if ((this.cpu != 0) && !this.cpu.isInfinite && (that.cpu != 0) &&
!that.cpu.isInfinite) {
+      d *= this.cpu / that.cpu
+      n += 1
+    }
+    if ((this.io != 0) && !this.io.isInfinite && (that.io != 0) &&
!that.io.isInfinite) {
+      d *= this.io / that.io
+      n += 1
+    }
+    if (n == 0) {
+      return 1.0
+    }
+    Math.pow(d, 1 / n)
+  }
+
+  def plus(other: RelOptCost): RelOptCost = {
+    val that: DataSetCost = other.asInstanceOf[DataSetCost]
+    if ((this eq DataSetCost.Infinity) || (that eq DataSetCost.Infinity)) {
+      return DataSetCost.Infinity
+    }
+    new DataSetCost(this.rowCount + that.rowCount, this.cpu + that.cpu, this.io + that.io)
+  }
+
+  override def toString: String = s"{$rowCount rows, $cpu cpu, $io io}"
+
+}
+
+object DataSetCost {
+
+  private[flink] val Infinity = new DataSetCost(
+    Double.PositiveInfinity,
+    Double.PositiveInfinity,
+    Double.PositiveInfinity)
+  {
+    override def toString: String = "{inf}"
+  }
+
+  private[flink] val Huge = new DataSetCost(Double.MaxValue, Double.MaxValue, Double.MaxValue)
{
+    override def toString: String = "{huge}"
+  }
+
+  private[flink] val Zero = new DataSetCost(0.0, 0.0, 0.0) {
+    override def toString: String = "{0}"
+  }
+
+  private[flink] val Tiny = new DataSetCost(1.0, 1.0, 0.0) {
+    override def toString = "{tiny}"
+  }
+
+  val FACTORY: RelOptCostFactory = new DataSetCostFactory
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
new file mode 100644
index 0000000..87d57d6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.cost
+
+import org.apache.calcite.plan.{RelOptCost, RelOptCostFactory}
+
+/**
+  * This class is based on Apache Calcite's `org.apache.calcite.plan.volcano.VolcanoCost#Factory`.
+  */
+class DataSetCostFactory extends RelOptCostFactory {
+
+  override def makeCost(dRows: Double, dCpu: Double, dIo: Double): RelOptCost = {
+    new DataSetCost(dRows, dCpu, dIo)
+  }
+
+  override def makeHugeCost: RelOptCost = {
+    DataSetCost.Huge
+  }
+
+  override def makeInfiniteCost: RelOptCost = {
+    DataSetCost.Infinity
+  }
+
+  override def makeTinyCost: RelOptCost = {
+    DataSetCost.Tiny
+  }
+
+  override def makeZeroCost: RelOptCost = {
+    DataSetCost.Zero
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index e0d5c26..c917061 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -67,6 +68,15 @@ class DataSetAggregate(
       .item("select", aggregationToString)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+
+    val child = this.getInput
+    val rowCnt = RelMetadataQuery.getRowCount(child)
+    val rowSize = this.estimateRowSize(child.getRowType)
+    val aggCnt = this.namedAggregates.size
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
+  }
+
   override def translateToPlan(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 4bc2a0d..d26c1cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelOptPlanner, RelOptCost, RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -66,6 +67,26 @@ class DataSetCalc(
       .itemIf("where", conditionToString, calcProgram.getCondition != null)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+
+    val child = this.getInput
+    val rowCnt = RelMetadataQuery.getRowCount(child)
+    val exprCnt = calcProgram.getExprCount
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
+  }
+
+  override def getRows: Double = {
+    val child = this.getInput
+    val rowCnt = RelMetadataQuery.getRowCount(child)
+
+    if (calcProgram.getCondition != null) {
+      // we reduce the result card to push filters down
+      (rowCnt * 0.75).min(1.0)
+    } else {
+      rowCnt
+    }
+  }
+
   override def translateToPlan(config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 08e2039..38a70fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelTraitSet, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinInfo
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.calcite.util.mapping.IntPair
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
@@ -82,6 +83,17 @@ class DataSetJoin(
       .item("join", joinSelectionToString)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+
+    val children = this.getInputs
+
+    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
+      val rowCnt = RelMetadataQuery.getRowCount(child)
+      val rowSize = this.estimateRowSize(child.getRowType)
+      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
+    }
+  }
+
   override def translateToPlan(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index d708048..502e362 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,7 +19,9 @@
 package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.TableConfig
@@ -66,5 +68,24 @@ trait DataSetRel extends RelNode {
     }
   }
 
+  private[flink] def estimateRowSize(rowType: RelDataType): Double = {
+
+    rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
+      t match {
+        case SqlTypeName.TINYINT => s + 1
+        case SqlTypeName.SMALLINT => s + 2
+        case SqlTypeName.INTEGER => s + 4
+        case SqlTypeName.BIGINT => s + 8
+        case SqlTypeName.BOOLEAN => s + 1
+        case SqlTypeName.FLOAT => s + 4
+        case SqlTypeName.DOUBLE => s + 8
+        case SqlTypeName.VARCHAR => s + 12
+        case SqlTypeName.CHAR => s + 1
+        case _ => throw new IllegalArgumentException("Unsupported data type encountered")
+      }
+    }
+
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index 18537ff..4e20ab9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -22,6 +22,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
@@ -62,6 +63,12 @@ class DataSetSource(
     )
   }
 
+  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+
+    val rowCnt = RelMetadataQuery.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
   override def translateToPlan(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]])

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index c3bda89..8f22285 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -18,14 +18,16 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.TableConfig
 
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 /**
 * Flink RelNode which matches along with UnionOperator.
@@ -56,6 +58,16 @@ class DataSetUnion(
     super.explainTerms(pw).item("union", unionSelectionToString)
   }
 
+  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
+
+    val children = this.getInputs
+    val rowCnt = children.foldLeft(0D) { (rows, child) =>
+      rows + RelMetadataQuery.getRowCount(child)
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
   override def translateToPlan(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfed342/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
index b33f66d..ff371f7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
@@ -76,9 +76,9 @@ class DataSetTable[T](
     builder.build
   }
 
-//  override def getStatistic: Statistic = {
-//    new DefaultDataSetStatistic
-//  }
+  override def getStatistic: Statistic = {
+    new DefaultDataSetStatistic
+  }
 
 }
 


Mime
View raw message