spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in aggregation.
Date Thu, 20 Aug 2015 14:53:32 GMT
Repository: spark
Updated Branches:
  refs/heads/master 43e013542 -> b4f4e91c3


[SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in aggregation.

This improves performance by ~ 20 - 30% in one of my local test and should fix the performance
regression from 1.4 to 1.5 on ss_max.

Author: Reynold Xin <rxin@databricks.com>

Closes #8332 from rxin/SPARK-10100.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4f4e91c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4f4e91c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4f4e91c

Branch: refs/heads/master
Commit: b4f4e91c395cb69ced61d9ff1492d1b814f96828
Parents: 43e0135
Author: Reynold Xin <rxin@databricks.com>
Authored: Thu Aug 20 07:53:27 2015 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu Aug 20 07:53:27 2015 -0700

----------------------------------------------------------------------
 .../execution/aggregate/TungstenAggregate.scala |  2 +-
 .../aggregate/TungstenAggregationIterator.scala | 30 ++++++++++++++------
 2 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4f4e91c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 99f51ba..ba379d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -104,7 +104,7 @@ case class TungstenAggregate(
         } else {
           // This is a grouped aggregate and the input iterator is empty,
           // so return an empty iterator.
-          Iterator[UnsafeRow]()
+          Iterator.empty
         }
       } else {
         aggregationIterator.start(parentIterator)

http://git-wip-us.apache.org/repos/asf/spark/blob/b4f4e91c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index af7e0fc..26fdbc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -357,18 +357,30 @@ class TungstenAggregationIterator(
   // sort-based aggregation (by calling switchToSortBasedAggregation).
   private def processInputs(): Unit = {
     assert(inputIter != null, "attempted to process input when iterator was null")
-    while (!sortBased && inputIter.hasNext) {
-      val newInput = inputIter.next()
-      numInputRows += 1
-      val groupingKey = groupProjection.apply(newInput)
+    if (groupingExpressions.isEmpty) {
+      // If there is no grouping expressions, we can just reuse the same buffer over and
over again.
+      // Note that it would be better to eliminate the hash map entirely in the future.
+      val groupingKey = groupProjection.apply(null)
       val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
-      if (buffer == null) {
-        // buffer == null means that we could not allocate more memory.
-        // Now, we need to spill the map and switch to sort-based aggregation.
-        switchToSortBasedAggregation(groupingKey, newInput)
-      } else {
+      while (inputIter.hasNext) {
+        val newInput = inputIter.next()
+        numInputRows += 1
         processRow(buffer, newInput)
       }
+    } else {
+      while (!sortBased && inputIter.hasNext) {
+        val newInput = inputIter.next()
+        numInputRows += 1
+        val groupingKey = groupProjection.apply(newInput)
+        val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
+        if (buffer == null) {
+          // buffer == null means that we could not allocate more memory.
+          // Now, we need to spill the map and switch to sort-based aggregation.
+          switchToSortBasedAggregation(groupingKey, newInput)
+        } else {
+          processRow(buffer, newInput)
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message