Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D266F187DA for ; Sat, 9 May 2015 00:15:20 +0000 (UTC) Received: (qmail 92741 invoked by uid 500); 9 May 2015 00:15:20 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 92712 invoked by uid 500); 9 May 2015 00:15:20 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 92703 invoked by uid 99); 9 May 2015 00:15:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 May 2015 00:15:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9717FE41A5; Sat, 9 May 2015 00:15:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Message-Id: <8a76acd5e1c040ba8cb5b8929f8bfd0d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-7469] [SQL] DAG visualization: show SQL query operators Date: Sat, 9 May 2015 00:15:20 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.4 1eae47620 -> cafffd0c2 [SPARK-7469] [SQL] DAG visualization: show SQL query operators The DAG visualization currently displays only low-level Spark primitives (e.g. `map`, `reduceByKey`, `filter` etc.). For SQL, these aren't particularly useful. Instead, we should display higher level physical operators (e.g. `Filter`, `Exchange`, `ShuffleHashJoin`). cc marmbrus ----------------- **Before** ----------------- **After** (Pay attention to the words) ----------------- Author: Andrew Or Closes #5999 from andrewor14/dag-viz-sql and squashes the following commits: 0db23a4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql 1e211db [Andrew Or] Update comment 0d49fd6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-sql ffd237a [Andrew Or] Fix style 202dac1 [Andrew Or] Make ignoreParent false by default e61b1ab [Andrew Or] Visualize SQL operators, not low-level Spark primitives 569034a [Andrew Or] Add a flag to ignore parent settings and scopes (cherry picked from commit bd61f07039064833108070e19b752d4c46045766) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cafffd0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cafffd0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cafffd0c Branch: refs/heads/branch-1.4 Commit: cafffd0c294a1d5dd90599179a64bc42d7aa410b Parents: 1eae476 Author: Andrew Or Authored: Fri May 8 17:15:10 2015 -0700 Committer: Andrew Or Committed: Fri May 8 17:15:17 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/rdd/RDDOperationScope.scala | 20 ++++++++++----- .../columnar/InMemoryColumnarTableScan.scala | 2 +- .../apache/spark/sql/execution/Aggregate.scala | 2 +- .../apache/spark/sql/execution/Exchange.scala | 2 +- .../spark/sql/execution/ExistingRDD.scala | 2 +- .../org/apache/spark/sql/execution/Expand.scala | 2 +- .../apache/spark/sql/execution/Generate.scala | 2 +- .../sql/execution/GeneratedAggregate.scala | 2 +- .../spark/sql/execution/LocalTableScan.scala | 2 +- .../apache/spark/sql/execution/SparkPlan.scala | 19 +++++++++++--- .../org/apache/spark/sql/execution/Window.scala | 2 +- .../spark/sql/execution/basicOperators.scala | 26 ++++++++++---------- .../apache/spark/sql/execution/commands.scala | 2 +- .../spark/sql/execution/debug/package.scala | 4 +-- .../sql/execution/joins/BroadcastHashJoin.scala | 2 +- .../joins/BroadcastLeftSemiJoinHash.scala | 2 +- .../joins/BroadcastNestedLoopJoin.scala | 2 +- .../sql/execution/joins/CartesianProduct.scala | 2 +- .../sql/execution/joins/HashOuterJoin.scala | 2 +- .../sql/execution/joins/LeftSemiJoinBNL.scala | 2 +- .../sql/execution/joins/LeftSemiJoinHash.scala | 2 +- .../sql/execution/joins/ShuffledHashJoin.scala | 2 +- .../sql/execution/joins/SortMergeJoin.scala | 2 +- .../apache/spark/sql/execution/pythonUdfs.scala | 2 +- .../sql/parquet/ParquetTableOperations.scala | 4 +-- .../sql/hive/execution/HiveTableScan.scala | 2 +- .../hive/execution/InsertIntoHiveTable.scala | 4 ++- .../hive/execution/ScriptTransformation.scala | 2 +- 28 files changed, 71 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala index 9440d45..93ec606 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -102,16 +102,21 @@ private[spark] object RDDOperationScope { /** * Execute the given body such that all RDDs created in this body will have the same scope. * - * If nesting is allowed, this concatenates the previous scope with the new one in a way that - * signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to - * this method executed in the body will have no effect. + * If nesting is allowed, any subsequent calls to this method in the given body will instantiate + * child scopes that are nested within our scope. Otherwise, these calls will take no effect. + * + * Additionally, the caller of this method may optionally ignore the configurations and scopes + * set by the higher level caller. In this case, this method will ignore the parent caller's + * intention to disallow nesting, and the new scope instantiated will not have a parent. This + * is useful for scoping physical operations in Spark SQL, for instance. * * Note: Return statements are NOT allowed in body. */ private[spark] def withScope[T]( sc: SparkContext, name: String, - allowNesting: Boolean)(body: => T): T = { + allowNesting: Boolean, + ignoreParent: Boolean = false)(body: => T): T = { // Save the old scope to restore it later val scopeKey = SparkContext.RDD_SCOPE_KEY val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY @@ -119,8 +124,11 @@ private[spark] object RDDOperationScope { val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) val oldNoOverride = sc.getLocalProperty(noOverrideKey) try { - // Set the scope only if the higher level caller allows us to do so - if (sc.getLocalProperty(noOverrideKey) == null) { + if (ignoreParent) { + // Ignore all parent settings and scopes and start afresh with our own root scope + sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson) + } else if (sc.getLocalProperty(noOverrideKey) == null) { + // Otherwise, set the scope only if the higher level caller allows us to do so sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) } // Optionally disallow the child body to override our scope http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index d9b6fb4..0ded1cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { if (enableAccumulators) { readPartitions.setValue(0) readBatches.setValue(0) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 18b1ba4..8d16749 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -121,7 +121,7 @@ case class Aggregate( } } - override def execute(): RDD[Row] = attachTree(this, "execute") { + protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { if (groupingExpressions.isEmpty) { child.execute().mapPartitions { iter => val buffer = newAggregateBuffer() http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index f0d54cd..f02fa81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -109,7 +109,7 @@ case class Exchange( serializer } - override def execute(): RDD[Row] = attachTree(this , "execute") { + protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 57effbf..a500269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon /** Physical plan node for scanning data from an RDD. */ private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { - override def execute(): RDD[Row] = rdd + protected override def doExecute(): RDD[Row] = rdd } /** Logical plan node for scanning data from a local collection. */ http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index 5758494..f16ca36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -43,7 +43,7 @@ case class Expand( // as UNKNOWN partitioning override def outputPartitioning: Partitioning = UnknownPartitioning(0) - override def execute(): RDD[Row] = attachTree(this, "execute") { + protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { child.execute().mapPartitions { iter => // TODO Move out projection objects creation and transfer to // workers via closure. However we can't assume the Projection http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index 5201e20..08d9079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -46,7 +46,7 @@ case class Generate( val boundGenerator = BindReferences.bindReference(generator, child.output) - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { if (join) { child.execute().mapPartitions { iter => val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null)) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 5d9f202..2ec7d4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -66,7 +66,7 @@ case class GeneratedAggregate( override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val aggregatesToCompute = aggregateExpressions.flatMap { a => a.collect { case agg: AggregateExpression => agg} } http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index ace9af5..03bee80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e private lazy val rdd = sqlContext.sparkContext.parallelize(rows) - override def execute(): RDD[Row] = rdd + protected override def doExecute(): RDD[Row] = rdd override def executeCollect(): Array[Row] = { http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 59c8980..435ac01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.Logging -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees} import org.apache.spark.sql.catalyst.expressions._ @@ -79,14 +79,25 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) /** - * Runs this query returning the result as an RDD. + * Returns the result of this query as an RDD[Row] by delegating to doExecute + * after adding query plan information to created RDDs for visualization. + * Concrete implementations of SparkPlan should override doExecute instead. */ - def execute(): RDD[Row] + final def execute(): RDD[Row] = { + RDDOperationScope.withScope(sparkContext, nodeName, false, true) { + doExecute() + } + } /** - * Runs this query returning the result as an array. + * Overridden by concrete implementations of SparkPlan. + * Produces the result of the query as an RDD[Row] */ + protected def doExecute(): RDD[Row] + /** + * Runs this query returning the result as an array. + */ def executeCollect(): Array[Row] = { execute().mapPartitions { iter => val converter = CatalystTypeConverters.createToScalaConverter(schema) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 217b559..c4327ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -112,7 +112,7 @@ case class Window( } } - def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { child.execute().mapPartitions { iter => new Iterator[Row] { http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 5ca11e6..6cb67b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -37,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends @transient lazy val buildProjection = newMutableProjection(projectList, child.output) - override def execute(): RDD[Row] = child.execute().mapPartitions { iter => + protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter => val resuableProjection = buildProjection() iter.map(resuableProjection) } @@ -54,7 +54,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output) - override def execute(): RDD[Row] = child.execute().mapPartitions { iter => + protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter => iter.filter(conditionEvaluator) } @@ -83,7 +83,7 @@ case class Sample( override def output: Seq[Attribute] = child.output // TODO: How to pick seed? - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { if (withReplacement) { child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed) } else { @@ -99,7 +99,7 @@ case class Sample( case class Union(children: Seq[SparkPlan]) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output: Seq[Attribute] = children.head.output - override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute())) + protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute())) } /** @@ -124,7 +124,7 @@ case class Limit(limit: Int, child: SparkPlan) override def executeCollect(): Array[Row] = child.executeTake(limit) - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.take(limit).map(row => (false, row.copy())) @@ -166,7 +166,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. - override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1) + protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1) override def outputOrdering: Seq[SortOrder] = sortOrder } @@ -186,7 +186,7 @@ case class Sort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - override def execute(): RDD[Row] = attachTree(this, "sort") { + protected override def doExecute(): RDD[Row] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) iterator.map(_.copy()).toArray.sorted(ordering).iterator @@ -214,7 +214,7 @@ case class ExternalSort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - override def execute(): RDD[Row] = attachTree(this, "sort") { + protected override def doExecute(): RDD[Row] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering)) @@ -244,7 +244,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { child.execute().mapPartitions { iter => val hashSet = new scala.collection.mutable.HashSet[Row]() @@ -270,7 +270,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { child.execute().map(_.copy()).coalesce(numPartitions, shuffle) } } @@ -285,7 +285,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output: Seq[Attribute] = left.output - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } } @@ -299,7 +299,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output: Seq[Attribute] = children.head.output - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } } @@ -314,5 +314,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan { def children: Seq[SparkPlan] = child :: Nil - def execute(): RDD[Row] = child.execute() + protected override def doExecute(): RDD[Row] = child.execute() } http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 388a818..49b361e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -64,7 +64,7 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val converted = sideEffectResult.map(r => CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row]) sqlContext.sparkContext.parallelize(converted, 1) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 7107870..dffb265 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -125,7 +125,7 @@ package object debug { } } - def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { child.execute().mapPartitions { iter => new Iterator[Row] { def hasNext: Boolean = iter.hasNext @@ -193,7 +193,7 @@ package object debug { def children: List[SparkPlan] = child :: Nil - def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { child.execute().map { row => try typeCheck(row, child.schema) catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 926f5e6..05dd568 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -66,7 +66,7 @@ case class BroadcastHashJoin( sparkContext.broadcast(hashed) } - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val broadcastRelation = Await.result(broadcastFuture, timeout) streamedPlan.execute().mapPartitions { streamedIter => http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index 3ef1e0d..640fc26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -38,7 +38,7 @@ case class BroadcastLeftSemiJoinHash( override def output: Seq[Attribute] = left.output - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 6aaf35f..caad3df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -61,7 +61,7 @@ case class BroadcastNestedLoopJoin( @transient private lazy val boundCondition = newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 1cbc983..191c00c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index a396c0f..4557439 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -183,7 +183,7 @@ case class HashOuterJoin( hashTable } - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val joinedRow = new JoinedRow() left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => // TODO this probably can be replaced by external sort (sort merged join?) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala index b03af41..036423e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala @@ -47,7 +47,7 @@ case class LeftSemiJoinBNL( @transient private lazy val boundCondition = newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index a04f2a6..8ad27ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -42,7 +42,7 @@ case class LeftSemiJoinHash( override def output: Seq[Attribute] = left.output - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index a6cd833..219525d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -43,7 +43,7 @@ case class ShuffledHashJoin( override def requiredChildDistribution: Seq[ClusteredDistribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashed = HashedRelation(buildIter, buildSideKeyGenerator) hashJoin(streamIter, hashed) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index b512366..1a39fb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -60,7 +60,7 @@ case class SortMergeJoin( private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = keys.map(SortOrder(_, Ascending)) - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 58cb198..3dbc383 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -228,7 +228,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: def children: Seq[SparkPlan] = child :: Nil - def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { val childResults = child.execute().map(_.copy()) val parent = childResults.mapPartitions { iter => http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index aded126..75ac52d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan( } }.toArray - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { import parquet.filter2.compat.FilterCompat.FilterPredicateCompat val sc = sqlContext.sparkContext @@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable( /** * Inserts all rows into the Parquet file. */ - override def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { // TODO: currently we do not check whether the "schema"s are compatible // That means if one first creates a table and then INSERTs data with // and incompatible schema the execution will fail. It would be nice http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 0a5f19e..62dc416 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -129,7 +129,7 @@ case class HiveTableScan( } } - override def execute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) { + protected override def doExecute(): RDD[Row] = if (!relation.hiveQlTable.isPartitioned) { hadoopReader.makeRDDForTable(relation.hiveQlTable) } else { hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index de8954d..c0b0b10 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -258,5 +258,7 @@ case class InsertIntoHiveTable( override def executeCollect(): Array[Row] = sideEffectResult.toArray - override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) + protected override def doExecute(): RDD[Row] = { + sqlContext.sparkContext.parallelize(sideEffectResult, 1) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/cafffd0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 3eddda3..bfd26e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -54,7 +54,7 @@ case class ScriptTransformation( override def otherCopyArgs: Seq[HiveContext] = sc :: Nil - def execute(): RDD[Row] = { + protected override def doExecute(): RDD[Row] = { child.execute().mapPartitions { iter => val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org