spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval
Date Thu, 06 Jul 2017 20:58:29 GMT
Repository: spark
Updated Branches:
  refs/heads/master 48e44b24a -> bf66335ac


[SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval

## What changes were proposed in this pull request?

Rename org.apache.spark.sql.catalyst.plans.logical.statsEstimation.Range to ValueInterval.
The current naming is identical to logical operator "range".
Refactoring it to ValueInterval is more accurate.

## How was this patch tested?

unit test

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #18549 from gengliangwang/ValueInterval.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf66335a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf66335a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf66335a

Branch: refs/heads/master
Commit: bf66335acab3c0c188f6c378eb8aa6948a259cb2
Parents: 48e44b2
Author: Wang Gengliang <ltnwgl@gmail.com>
Authored: Thu Jul 6 13:58:27 2017 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Jul 6 13:58:27 2017 -0700

----------------------------------------------------------------------
 .../statsEstimation/FilterEstimation.scala      | 36 ++++----
 .../statsEstimation/JoinEstimation.scala        | 14 +--
 .../plans/logical/statsEstimation/Range.scala   | 88 -------------------
 .../logical/statsEstimation/ValueInterval.scala | 91 ++++++++++++++++++++
 4 files changed, 117 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 5a3bee7..e13db85 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -316,8 +316,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
     // decide if the value is in [min, max] of the column.
     // We currently don't store min/max for binary/string type.
     // Hence, we assume it is in boundary for binary/string type.
-    val statsRange = Range(colStat.min, colStat.max, attr.dataType)
-    if (statsRange.contains(literal)) {
+    val statsInterval = ValueInterval(colStat.min, colStat.max, attr.dataType)
+    if (statsInterval.contains(literal)) {
       if (update) {
         // We update ColumnStat structure after apply this equality predicate:
         // Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal
@@ -388,9 +388,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
     // use [min, max] to filter the original hSet
     dataType match {
       case _: NumericType | BooleanType | DateType | TimestampType =>
-        val statsRange = Range(colStat.min, colStat.max, dataType).asInstanceOf[NumericRange]
+        val statsInterval =
+          ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval]
         val validQuerySet = hSet.filter { v =>
-          v != null && statsRange.contains(Literal(v, dataType))
+          v != null && statsInterval.contains(Literal(v, dataType))
         }
 
         if (validQuerySet.isEmpty) {
@@ -440,12 +441,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
       update: Boolean): Option[BigDecimal] = {
 
     val colStat = colStatsMap(attr)
-    val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
-    val max = statsRange.max.toBigDecimal
-    val min = statsRange.min.toBigDecimal
+    val statsInterval =
+      ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval]
+    val max = statsInterval.max.toBigDecimal
+    val min = statsInterval.min.toBigDecimal
     val ndv = BigDecimal(colStat.distinctCount)
 
-    // determine the overlapping degree between predicate range and column's range
+    // determine the overlapping degree between predicate interval and column's interval
     val numericLiteral = if (literal.dataType == BooleanType) {
       if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0)
     } else {
@@ -566,18 +568,18 @@ case class FilterEstimation(plan: Filter) extends Logging {
     }
 
     val colStatLeft = colStatsMap(attrLeft)
-    val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
-      .asInstanceOf[NumericRange]
-    val maxLeft = statsRangeLeft.max
-    val minLeft = statsRangeLeft.min
+    val statsIntervalLeft = ValueInterval(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
+      .asInstanceOf[NumericValueInterval]
+    val maxLeft = statsIntervalLeft.max
+    val minLeft = statsIntervalLeft.min
 
     val colStatRight = colStatsMap(attrRight)
-    val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType)
-      .asInstanceOf[NumericRange]
-    val maxRight = statsRangeRight.max
-    val minRight = statsRangeRight.min
+    val statsIntervalRight = ValueInterval(colStatRight.min, colStatRight.max, attrRight.dataType)
+      .asInstanceOf[NumericValueInterval]
+    val maxRight = statsIntervalRight.max
+    val minRight = statsIntervalRight.min
 
-    // determine the overlapping degree between predicate range and column's range
+    // determine the overlapping degree between predicate interval and column's interval
     val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
     val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
       // Left < Right or Left <= Right

http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f481969..dcbe36d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -175,9 +175,9 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // Check if the two sides are disjoint
       val leftKeyStats = leftStats.attributeStats(leftKey)
       val rightKeyStats = rightStats.attributeStats(rightKey)
-      val lRange = Range(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
-      val rRange = Range(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
-      if (Range.isIntersected(lRange, rRange)) {
+      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
+      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
+      if (ValueInterval.isIntersected(lInterval, rInterval)) {
         // Get the largest ndv among pairs of join keys
         val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
         if (maxNdv > ndvDenom) ndvDenom = maxNdv
@@ -239,16 +239,16 @@ case class InnerOuterEstimation(join: Join) extends Logging {
     joinKeyPairs.foreach { case (leftKey, rightKey) =>
       val leftKeyStats = leftStats.attributeStats(leftKey)
       val rightKeyStats = rightStats.attributeStats(rightKey)
-      val lRange = Range(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
-      val rRange = Range(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
+      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
+      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
       // When we reach here, join selectivity is not zero, so each pair of join keys should
be
       // intersected.
-      assert(Range.isIntersected(lRange, rRange))
+      assert(ValueInterval.isIntersected(lInterval, rInterval))
 
       // Update intersected column stats
       assert(leftKey.dataType.sameType(rightKey.dataType))
       val newNdv = leftKeyStats.distinctCount.min(rightKeyStats.distinctCount)
-      val (newMin, newMax) = Range.intersect(lRange, rRange, leftKey.dataType)
+      val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
       val newMaxLen = math.min(leftKeyStats.maxLen, rightKeyStats.maxLen)
       val newAvgLen = (leftKeyStats.avgLen + rightKeyStats.avgLen) / 2
       val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
deleted file mode 100644
index 4ac5ba5..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
-
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.types._
-
-
-/** Value range of a column. */
-trait Range {
-  def contains(l: Literal): Boolean
-}
-
-/** For simplicity we use decimal to unify operations of numeric ranges. */
-case class NumericRange(min: Decimal, max: Decimal) extends Range {
-  override def contains(l: Literal): Boolean = {
-    val lit = EstimationUtils.toDecimal(l.value, l.dataType)
-    min <= lit && max >= lit
-  }
-}
-
-/**
- * This version of Spark does not have min/max for binary/string types, we define their default
- * behaviors by this class.
- */
-class DefaultRange extends Range {
-  override def contains(l: Literal): Boolean = true
-}
-
-/** This is for columns with only null values. */
-class NullRange extends Range {
-  override def contains(l: Literal): Boolean = false
-}
-
-object Range {
-  def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match
{
-    case StringType | BinaryType => new DefaultRange()
-    case _ if min.isEmpty || max.isEmpty => new NullRange()
-    case _ =>
-      NumericRange(
-        min = EstimationUtils.toDecimal(min.get, dataType),
-        max = EstimationUtils.toDecimal(max.get, dataType))
-  }
-
-  def isIntersected(r1: Range, r2: Range): Boolean = (r1, r2) match {
-    case (_, _: DefaultRange) | (_: DefaultRange, _) =>
-      // The DefaultRange represents string/binary types which do not have max/min stats,
-      // we assume they are intersected to be conservative on estimation
-      true
-    case (_, _: NullRange) | (_: NullRange, _) =>
-      false
-    case (n1: NumericRange, n2: NumericRange) =>
-      n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0
-  }
-
-  /**
-   * Intersected results of two ranges. This is only for two overlapped ranges.
-   * The outputs are the intersected min/max values.
-   */
-  def intersect(r1: Range, r2: Range, dt: DataType): (Option[Any], Option[Any]) = {
-    (r1, r2) match {
-      case (_, _: DefaultRange) | (_: DefaultRange, _) =>
-        // binary/string types don't support intersecting.
-        (None, None)
-      case (n1: NumericRange, n2: NumericRange) =>
-        // Choose the maximum of two min values, and the minimum of two max values.
-        val newMin = if (n1.min <= n2.min) n2.min else n1.min
-        val newMax = if (n1.max <= n2.max) n1.max else n2.max
-        (Some(EstimationUtils.fromDecimal(newMin, dt)),
-          Some(EstimationUtils.fromDecimal(newMax, dt)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
new file mode 100644
index 0000000..0caaf79
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.types._
+
+
+/** Value range of a column. */
+trait ValueInterval {
+  def contains(l: Literal): Boolean
+}
+
+/** For simplicity we use decimal to unify operations of numeric intervals. */
+case class NumericValueInterval(min: Decimal, max: Decimal) extends ValueInterval {
+  override def contains(l: Literal): Boolean = {
+    val lit = EstimationUtils.toDecimal(l.value, l.dataType)
+    min <= lit && max >= lit
+  }
+}
+
+/**
+ * This version of Spark does not have min/max for binary/string types, we define their default
+ * behaviors by this class.
+ */
+class DefaultValueInterval extends ValueInterval {
+  override def contains(l: Literal): Boolean = true
+}
+
+/** This is for columns with only null values. */
+class NullValueInterval extends ValueInterval {
+  override def contains(l: Literal): Boolean = false
+}
+
+object ValueInterval {
+  def apply(
+      min: Option[Any],
+      max: Option[Any],
+      dataType: DataType): ValueInterval = dataType match {
+    case StringType | BinaryType => new DefaultValueInterval()
+    case _ if min.isEmpty || max.isEmpty => new NullValueInterval()
+    case _ =>
+      NumericValueInterval(
+        min = EstimationUtils.toDecimal(min.get, dataType),
+        max = EstimationUtils.toDecimal(max.get, dataType))
+  }
+
+  def isIntersected(r1: ValueInterval, r2: ValueInterval): Boolean = (r1, r2) match {
+    case (_, _: DefaultValueInterval) | (_: DefaultValueInterval, _) =>
+      // The DefaultValueInterval represents string/binary types which do not have max/min
stats,
+      // we assume they are intersected to be conservative on estimation
+      true
+    case (_, _: NullValueInterval) | (_: NullValueInterval, _) =>
+      false
+    case (n1: NumericValueInterval, n2: NumericValueInterval) =>
+      n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0
+  }
+
+  /**
+   * Intersected results of two intervals. This is only for two overlapped intervals.
+   * The outputs are the intersected min/max values.
+   */
+  def intersect(r1: ValueInterval, r2: ValueInterval, dt: DataType): (Option[Any], Option[Any])
= {
+    (r1, r2) match {
+      case (_, _: DefaultValueInterval) | (_: DefaultValueInterval, _) =>
+        // binary/string types don't support intersecting.
+        (None, None)
+      case (n1: NumericValueInterval, n2: NumericValueInterval) =>
+        // Choose the maximum of two min values, and the minimum of two max values.
+        val newMin = if (n1.min <= n2.min) n2.min else n1.min
+        val newMax = if (n1.max <= n2.max) n1.max else n2.max
+        (Some(EstimationUtils.fromDecimal(newMin, dt)),
+          Some(EstimationUtils.fromDecimal(newMax, dt)))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message