spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-15459][SQL] Make Range logical and physical explain consistent
Date Sun, 22 May 2016 07:03:40 GMT
Repository: spark
Updated Branches:
  refs/heads/master a11175eec -> 845e447fa


[SPARK-15459][SQL] Make Range logical and physical explain consistent

## What changes were proposed in this pull request?
This patch simplifies the implementation of Range operator and make the explain string consistent
between logical plan and physical plan. To do this, I changed RangeExec to embed a Range logical
plan in it.

Before this patch (note that the logical Range and physical Range actually output different
information):
```
== Optimized Logical Plan ==
Range 0, 100, 2, 2, [id#8L]

== Physical Plan ==
*Range 0, 2, 2, 50, [id#8L]
```

After this patch:
If step size is 1:
```
== Optimized Logical Plan ==
Range(0, 100, splits=2)

== Physical Plan ==
*Range(0, 100, splits=2)
```

If step size is not 1:
```
== Optimized Logical Plan ==
Range (0, 100, step=2, splits=2)

== Physical Plan ==
*Range (0, 100, step=2, splits=2)
```

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #13239 from rxin/SPARK-15459.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/845e447f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/845e447f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/845e447f

Branch: refs/heads/master
Commit: 845e447fa03bf0a53ed79fa7e240af94dc152d2c
Parents: a11175e
Author: Reynold Xin <rxin@databricks.com>
Authored: Sun May 22 00:03:37 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sun May 22 00:03:37 2016 -0700

----------------------------------------------------------------------
 .../plans/logical/basicLogicalOperators.scala   | 18 +++++++++++---
 .../catalyst/catalog/SessionCatalogSuite.scala  | 16 ++++++------
 .../spark/sql/execution/SparkStrategies.scala   |  4 +--
 .../sql/execution/basicPhysicalOperators.scala  | 26 +++++++++-----------
 .../spark/sql/internal/CatalogSuite.scala       |  2 +-
 5 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index bed48b6..b1b3e00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -431,8 +431,11 @@ case class Range(
     end: Long,
     step: Long,
     numSlices: Int,
-    output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
-  require(step != 0, "step cannot be 0")
+    output: Seq[Attribute])
+  extends LeafNode with MultiInstanceRelation {
+
+  require(step != 0, s"step ($step) cannot be 0")
+
   val numElements: BigInt = {
     val safeStart = BigInt(start)
     val safeEnd = BigInt(end)
@@ -444,13 +447,20 @@ case class Range(
     }
   }
 
-  override def newInstance(): Range =
-    Range(start, end, step, numSlices, output.map(_.newInstance()))
+  override def newInstance(): Range = copy(output = output.map(_.newInstance()))
 
   override def statistics: Statistics = {
     val sizeInBytes = LongType.defaultSize * numElements
     Statistics( sizeInBytes = sizeInBytes )
   }
+
+  override def simpleString: String = {
+    if (step == 1) {
+      s"Range ($start, $end, splits=$numSlices)"
+    } else {
+      s"Range ($start, $end, step=$step, splits=$numSlices)"
+    }
+  }
 }
 
 case class Aggregate(

http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 91e2e07..a4dc03c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -197,8 +197,8 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("create temp table") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable1 = Range(1, 10, 1, 10, Seq())
-    val tempTable2 = Range(1, 20, 2, 10, Seq())
+    val tempTable1 = Range(1, 10, 1, 10)
+    val tempTable2 = Range(1, 20, 2, 10)
     catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
     catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
     assert(catalog.getTempTable("tbl1") == Option(tempTable1))
@@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("drop temp table") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
     assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
@@ -304,7 +304,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("rename temp table") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
     assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
@@ -383,7 +383,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("lookup table relation") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable1 = Range(1, 10, 1, 10, Seq())
+    val tempTable1 = Range(1, 10, 1, 10)
     val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
     sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
@@ -422,7 +422,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
     assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
     // If database is explicitly specified, do not check temporary tables
-    val tempTable = Range(1, 10, 1, 10, Seq())
+    val tempTable = Range(1, 10, 1, 10)
     catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
     assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
     // If database is not explicitly specified, check the current database
@@ -434,7 +434,7 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list tables without pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
     assert(catalog.listTables("db1").toSet ==
@@ -451,7 +451,7 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list tables with pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
     assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 664e7f5..555a2f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -360,8 +360,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan]
{
           generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
       case logical.OneRowRelation =>
         execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
-      case r @ logical.Range(start, end, step, numSlices, output) =>
-        execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil
+      case r : logical.Range =>
+        execution.RangeExec(r) :: Nil
       case logical.RepartitionByExpression(expressions, child, nPartitions) =>
         exchange.ShuffleExchange(HashPartitioning(
           expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d492fa7..89bde6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.LongType
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
 import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
 
 /** Physical plan for Project. */
@@ -305,22 +305,18 @@ case class SampleExec(
 
 
 /**
- * Physical plan for range (generating a range of 64 bit numbers.
- *
- * @param start first number in the range, inclusive.
- * @param step size of the step increment.
- * @param numSlices number of partitions.
- * @param numElements total number of elements to output.
- * @param output output attributes.
+ * Physical plan for range (generating a range of 64 bit numbers).
  */
-case class RangeExec(
-    start: Long,
-    step: Long,
-    numSlices: Int,
-    numElements: BigInt,
-    output: Seq[Attribute])
+case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
   extends LeafExecNode with CodegenSupport {
 
+  def start: Long = range.start
+  def step: Long = range.step
+  def numSlices: Int = range.numSlices
+  def numElements: BigInt = range.numElements
+
+  override val output: Seq[Attribute] = range.output
+
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
@@ -458,6 +454,8 @@ case class RangeExec(
         }
       }
   }
+
+  override def simpleString: String = range.simpleString
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index e4d4cec..cd434f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -58,7 +58,7 @@ class CatalogSuite
   }
 
   private def createTempTable(name: String): Unit = {
-    sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true)
+    sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = true)
   }
 
   private def dropTable(name: String, db: Option[String] = None): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message