Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EE17F200AC9 for ; Sun, 22 May 2016 09:03:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ECA48160A06; Sun, 22 May 2016 07:03:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E990416098E for ; Sun, 22 May 2016 09:03:40 +0200 (CEST) Received: (qmail 28055 invoked by uid 500); 22 May 2016 07:03:40 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 28046 invoked by uid 99); 22 May 2016 07:03:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 May 2016 07:03:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0E1E3DFDEF; Sun, 22 May 2016 07:03:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: <629d003c922440e3879dfbe85343934a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15459][SQL] Make Range logical and physical explain consistent Date: Sun, 22 May 2016 07:03:40 +0000 (UTC) archived-at: Sun, 22 May 2016 07:03:42 -0000 Repository: spark Updated Branches: refs/heads/master a11175eec -> 845e447fa [SPARK-15459][SQL] Make Range logical and physical explain consistent ## What changes were proposed in this pull request? This patch simplifies the implementation of Range operator and make the explain string consistent between logical plan and physical plan. To do this, I changed RangeExec to embed a Range logical plan in it. Before this patch (note that the logical Range and physical Range actually output different information): ``` == Optimized Logical Plan == Range 0, 100, 2, 2, [id#8L] == Physical Plan == *Range 0, 2, 2, 50, [id#8L] ``` After this patch: If step size is 1: ``` == Optimized Logical Plan == Range(0, 100, splits=2) == Physical Plan == *Range(0, 100, splits=2) ``` If step size is not 1: ``` == Optimized Logical Plan == Range (0, 100, step=2, splits=2) == Physical Plan == *Range (0, 100, step=2, splits=2) ``` ## How was this patch tested? N/A Author: Reynold Xin Closes #13239 from rxin/SPARK-15459. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/845e447f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/845e447f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/845e447f Branch: refs/heads/master Commit: 845e447fa03bf0a53ed79fa7e240af94dc152d2c Parents: a11175e Author: Reynold Xin Authored: Sun May 22 00:03:37 2016 -0700 Committer: Reynold Xin Committed: Sun May 22 00:03:37 2016 -0700 ---------------------------------------------------------------------- .../plans/logical/basicLogicalOperators.scala | 18 +++++++++++--- .../catalyst/catalog/SessionCatalogSuite.scala | 16 ++++++------ .../spark/sql/execution/SparkStrategies.scala | 4 +-- .../sql/execution/basicPhysicalOperators.scala | 26 +++++++++----------- .../spark/sql/internal/CatalogSuite.scala | 2 +- 5 files changed, 37 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index bed48b6..b1b3e00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -431,8 +431,11 @@ case class Range( end: Long, step: Long, numSlices: Int, - output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { - require(step != 0, "step cannot be 0") + output: Seq[Attribute]) + extends LeafNode with MultiInstanceRelation { + + require(step != 0, s"step ($step) cannot be 0") + val numElements: BigInt = { val safeStart = BigInt(start) val safeEnd = BigInt(end) @@ -444,13 +447,20 @@ case class Range( } } - override def newInstance(): Range = - Range(start, end, step, numSlices, output.map(_.newInstance())) + override def newInstance(): Range = copy(output = output.map(_.newInstance())) override def statistics: Statistics = { val sizeInBytes = LongType.defaultSize * numElements Statistics( sizeInBytes = sizeInBytes ) } + + override def simpleString: String = { + if (step == 1) { + s"Range ($start, $end, splits=$numSlices)" + } else { + s"Range ($start, $end, step=$step, splits=$numSlices)" + } + } } case class Aggregate( http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 91e2e07..a4dc03c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -197,8 +197,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("create temp table") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable1 = Range(1, 10, 1, 10, Seq()) - val tempTable2 = Range(1, 20, 2, 10, Seq()) + val tempTable1 = Range(1, 10, 1, 10) + val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) assert(catalog.getTempTable("tbl1") == Option(tempTable1)) @@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop temp table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) @@ -304,7 +304,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("rename temp table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) @@ -383,7 +383,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("lookup table relation") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable1 = Range(1, 10, 1, 10, Seq()) + val tempTable1 = Range(1, 10, 1, 10) val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") @@ -422,7 +422,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) // If database is explicitly specified, do not check temporary tables - val tempTable = Range(1, 10, 1, 10, Seq()) + val tempTable = Range(1, 10, 1, 10) catalog.createTempView("tbl3", tempTable, overrideIfExists = false) assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) // If database is not explicitly specified, check the current database @@ -434,7 +434,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1").toSet == @@ -451,7 +451,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables with pattern") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 664e7f5..555a2f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -360,8 +360,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil - case r @ logical.Range(start, end, step, numSlices, output) => - execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil + case r : logical.Range => + execution.RangeExec(r) :: Nil case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index d492fa7..89bde6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ @@ -305,22 +305,18 @@ case class SampleExec( /** - * Physical plan for range (generating a range of 64 bit numbers. - * - * @param start first number in the range, inclusive. - * @param step size of the step increment. - * @param numSlices number of partitions. - * @param numElements total number of elements to output. - * @param output output attributes. + * Physical plan for range (generating a range of 64 bit numbers). */ -case class RangeExec( - start: Long, - step: Long, - numSlices: Int, - numElements: BigInt, - output: Seq[Attribute]) +case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) extends LeafExecNode with CodegenSupport { + def start: Long = range.start + def step: Long = range.step + def numSlices: Int = range.numSlices + def numElements: BigInt = range.numElements + + override val output: Seq[Attribute] = range.output + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -458,6 +454,8 @@ case class RangeExec( } } } + + override def simpleString: String = range.simpleString } /** http://git-wip-us.apache.org/repos/asf/spark/blob/845e447f/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index e4d4cec..cd434f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -58,7 +58,7 @@ class CatalogSuite } private def createTempTable(name: String): Unit = { - sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true) + sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = true) } private def dropTable(name: String, db: Option[String] = None): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org