spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-21258][SQL] Fix WindowExec complex object aggregation with spilling
Date Fri, 30 Jun 2017 04:34:13 GMT
Repository: spark
Updated Branches:
  refs/heads/master cfc696f4a -> e2f32ee45


[SPARK-21258][SQL] Fix WindowExec complex object aggregation with spilling

## What changes were proposed in this pull request?
`WindowExec` currently improperly stores complex objects (UnsafeRow, UnsafeArrayData, UnsafeMapData,
UTF8String) during aggregation by keeping a reference in the buffer used by `GeneratedMutableProjections`
to the actual input data. Things go wrong when the input object (or the backing bytes) are
reused for other things. This could happen in window functions when it starts spilling to
disk. When reading the back the spill files the `UnsafeSorterSpillReader` reuses the buffer
to which the `UnsafeRow` points, leading to weird corruption scenario's. Note that this only
happens for aggregate functions that preserve (parts of) their input, for example `FIRST`,
`LAST`, `MIN` & `MAX`.

This was not seen before, because the spilling logic was not doing actual spills as much and
actually used an in-memory page. This page was not cleaned up during window processing and
made sure unsafe objects point to their own dedicated memory location. This was changed by
https://github.com/apache/spark/pull/16909, after this PR Spark spills more eagerly.

This PR provides a surgical fix because we are close to releasing Spark 2.2. This change just
makes sure that there cannot be any object reuse at the expensive of a little bit of performance.
We will follow-up with a more subtle solution at a later point.

## How was this patch tested?
Added a regression test to `DataFrameWindowFunctionsSuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #18470 from hvanhovell/SPARK-21258.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2f32ee4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2f32ee4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2f32ee4

Branch: refs/heads/master
Commit: e2f32ee45ac907f1f53fde7e412676a849a94872
Parents: cfc696f
Author: Herman van Hovell <hvanhovell@databricks.com>
Authored: Fri Jun 30 12:34:09 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Fri Jun 30 12:34:09 2017 +0800

----------------------------------------------------------------------
 .../execution/window/AggregateProcessor.scala   |  7 ++-
 .../sql/DataFrameWindowFunctionsSuite.scala     | 47 +++++++++++++++++++-
 2 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2f32ee4/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index bc141b3..2195c6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -145,10 +145,13 @@ private[window] final class AggregateProcessor(
 
   /** Update the buffer. */
   def update(input: InternalRow): Unit = {
-    updateProjection(join(buffer, input))
+    // TODO(hvanhovell) this sacrifices performance for correctness. We should make sure
that
+    // MutableProjection makes copies of the complex input objects it buffer.
+    val copy = input.copy()
+    updateProjection(join(buffer, copy))
     var i = 0
     while (i < numImperatives) {
-      imperatives(i).update(buffer, input)
+      imperatives(i).update(buffer, copy)
       i += 1
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e2f32ee4/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 1255c49..204858f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction,
Window}
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{DataType, LongType, StructType}
+import org.apache.spark.sql.types._
 
 /**
  * Window function testing for DataFrame API.
@@ -423,4 +424,48 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext
{
       df.select(selectList: _*).where($"value" < 2),
       Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
   }
+
+  test("SPARK-21258: complex object in combination with spilling") {
+    // Make sure we trigger the spilling path.
+    withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") {
+      val sampleSchema = new StructType().
+        add("f0", StringType).
+        add("f1", LongType).
+        add("f2", ArrayType(new StructType().
+          add("f20", StringType))).
+        add("f3", ArrayType(new StructType().
+          add("f30", StringType)))
+
+      val w0 = Window.partitionBy("f0").orderBy("f1")
+      val w1 = w0.rowsBetween(Long.MinValue, Long.MaxValue)
+
+      val c0 = first(struct($"f2", $"f3")).over(w0) as "c0"
+      val c1 = last(struct($"f2", $"f3")).over(w1) as "c1"
+
+      val input =
+        """{"f1":1497820153720,"f2":[{"f20":"x","f21":0}],"f3":[{"f30":"x","f31":0}]}
+          |{"f1":1497802179638}
+          |{"f1":1497802189347}
+          |{"f1":1497802189593}
+          |{"f1":1497802189597}
+          |{"f1":1497802189599}
+          |{"f1":1497802192103}
+          |{"f1":1497802193414}
+          |{"f1":1497802193577}
+          |{"f1":1497802193709}
+          |{"f1":1497802202883}
+          |{"f1":1497802203006}
+          |{"f1":1497802203743}
+          |{"f1":1497802203834}
+          |{"f1":1497802203887}
+          |{"f1":1497802203893}
+          |{"f1":1497802203976}
+          |{"f1":1497820168098}
+          |""".stripMargin.split("\n").toSeq
+
+      import testImplicits._
+
+      spark.read.schema(sampleSchema).json(input.toDS()).select(c0, c1).foreach { _ =>
() }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message