spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-15495][SQL] Improve the explain output for Aggregation operator
Date Wed, 01 Jun 2016 16:58:13 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 cb254ecb1 -> 9406a3c9a


[SPARK-15495][SQL] Improve the explain output for Aggregation operator

## What changes were proposed in this pull request?

This PR improves the explain output of Aggregator operator.

SQL:

```
Seq((1,2,3)).toDF("a", "b", "c").createTempView("df1")
spark.sql("cache table df1")
spark.sql("select count(a), count(c), b from df1 group by b").explain()
```

**Before change:**

```
*TungstenAggregate(key=[b#8], functions=[count(1),count(1)], output=[count(a)#79L,count(c)#80L,b#8])
+- Exchange hashpartitioning(b#8, 200), None
   +- *TungstenAggregate(key=[b#8], functions=[partial_count(1),partial_count(1)], output=[b#8,count#98L,count#99L])
      +- InMemoryTableScan [b#8], InMemoryRelation [a#7,b#8,c#9], true, 10000, StorageLevel(disk=true,
memory=true, offheap=false, deserialized=true, replication=1), LocalTableScan [a#7,b#8,c#9],
[[1,2,3]], Some(df1)
``````

**After change:**

```
*Aggregate(key=[b#8], functions=[count(1),count(1)], output=[count(a)#79L,count(c)#80L,b#8])
+- Exchange hashpartitioning(b#8, 200), None
   +- *Aggregate(key=[b#8], functions=[partial_count(1),partial_count(1)], output=[b#8,count#98L,count#99L])
      +- InMemoryTableScan [b#8], InMemoryRelation [a#7,b#8,c#9], true, 10000, StorageLevel(disk,
memory, deserialized, 1 replicas), LocalTableScan [a#7,b#8,c#9], [[1,2,3]], Some(df1)
```

## How was this patch tested?

Manual test and existing UT.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13363 from clockfly/verbose3.

(cherry picked from commit d5012c274036463c47a751cfe9977ade3a68b668)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9406a3c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9406a3c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9406a3c9

Branch: refs/heads/branch-2.0
Commit: 9406a3c9a990bfc953e3bdc1ce558ae220176fa8
Parents: cb254ec
Author: Sean Zhong <seanzhong@databricks.com>
Authored: Wed Jun 1 09:58:01 2016 -0700
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Jun 1 09:58:09 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/StorageLevel.scala     | 10 ++++++++--
 .../org/apache/spark/sql/catalyst/trees/TreeNode.scala    |  2 +-
 .../sql/execution/aggregate/SortBasedAggregateExec.scala  |  2 +-
 .../spark/sql/execution/aggregate/TungstenAggregate.scala |  4 ++--
 4 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9406a3c9/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 216ec07..fad0404 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -120,8 +120,14 @@ class StorageLevel private(
   private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
 
   override def toString: String = {
-    s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
-      s"deserialized=$deserialized, replication=$replication)"
+    val disk = if (useDisk) "disk" else ""
+    val memory = if (useMemory) "memory" else ""
+    val heap = if (useOffHeap) "offheap" else ""
+    val deserialize = if (deserialized) "deserialized" else ""
+
+    val output =
+      Seq(disk, memory, heap, deserialize, s"$replication replicas").filter(_.nonEmpty)
+    s"StorageLevel(${output.mkString(", ")})"
   }
 
   override def hashCode(): Int = toInt * 41 + replication

http://git-wip-us.apache.org/repos/asf/spark/blob/9406a3c9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index e8e2a7b..d87e6c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -434,7 +434,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product
{
     case other => other :: Nil
   }.mkString(", ")
 
-  /** String representation of this node without any children. */
+  /** ONE line description of this node. */
   def simpleString: String = s"$nodeName $argString".trim
 
   override def toString: String = treeString

http://git-wip-us.apache.org/repos/asf/spark/blob/9406a3c9/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
index 2e74d59..af1fb4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala
@@ -106,6 +106,6 @@ case class SortBasedAggregateExec(
     val keyString = groupingExpressions.mkString("[", ",", "]")
     val functionString = allAggregateExpressions.mkString("[", ",", "]")
     val outputString = output.mkString("[", ",", "]")
-    s"SortBasedAggregate(key=$keyString, functions=$functionString, output=$outputString)"
+    s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)"
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9406a3c9/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 905e93c..0911779 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -769,9 +769,9 @@ case class TungstenAggregate(
         val keyString = groupingExpressions.mkString("[", ",", "]")
         val functionString = allAggregateExpressions.mkString("[", ",", "]")
         val outputString = output.mkString("[", ",", "]")
-        s"TungstenAggregate(key=$keyString, functions=$functionString, output=$outputString)"
+        s"Aggregate(key=$keyString, functions=$functionString, output=$outputString)"
       case Some(fallbackStartsAt) =>
-        s"TungstenAggregateWithControlledFallback $groupingExpressions " +
+        s"AggregateWithControlledFallback $groupingExpressions " +
           s"$allAggregateExpressions $resultExpressions fallbackStartsAt=$fallbackStartsAt"
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message