spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-15822] [SQL] Prevent byte array backed classes from referencing freed memory
Date Fri, 17 Jun 2016 05:27:38 GMT
Repository: spark
Updated Branches:
  refs/heads/master 513a03e41 -> 5ada60614


[SPARK-15822] [SQL] Prevent byte array backed classes from referencing freed memory

## What changes were proposed in this pull request?
`UTF8String` and all `Unsafe*` classes are backed by either on-heap or off-heap byte arrays.
The code generated version `SortMergeJoin` buffers the left hand side join keys during iteration.
This was actually problematic in off-heap mode when one of the keys is a `UTF8String` (or
any other 'Unsafe*` object) and the left hand side iterator was exhausted (and released its
memory); the buffered keys would reference freed memory. This causes Seg-faults and all kinds
of other undefined behavior when we would use one these buffered keys.

This PR fixes this problem by creating copies of the buffered variables. I have added a general
method to the `CodeGenerator` for this. I have checked all places in which this could happen,
and only `SortMergeJoin` had this problem.

This PR is largely based on the work of robbinspg and he should be credited for this.

closes https://github.com/apache/spark/pull/13707

## How was this patch tested?
Manually tested on problematic workloads.

Author: Pete Robbins <robbinspg@gmail.com>
Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #13723 from hvanhovell/SPARK-15822-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ada6061
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ada6061
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ada6061

Branch: refs/heads/master
Commit: 5ada606144c7bf38a797764619d7d1ff677802b3
Parents: 513a03e
Author: Pete Robbins <robbinspg@gmail.com>
Authored: Thu Jun 16 22:27:32 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Thu Jun 16 22:27:32 2016 -0700

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala         | 16 ++++++++++++++++
 .../sql/execution/joins/SortMergeJoinExec.scala     |  8 +-------
 2 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ada6061/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ff97cd3..6392ff4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -130,6 +130,22 @@ class CodegenContext {
     mutableStates += ((javaType, variableName, initCode))
   }
 
+  /**
+   * Add buffer variable which stores data coming from an [[InternalRow]]. This methods guarantees
+   * that the variable is safely stored, which is important for (potentially) byte array
backed
+   * data types like: UTF8String, ArrayData, MapData & InternalRow.
+   */
+  def addBufferedState(dataType: DataType, variableName: String, initCode: String): ExprCode
= {
+    val value = freshName(variableName)
+    addMutableState(javaType(dataType), value, "")
+    val code = dataType match {
+      case StringType => s"$value = $initCode.clone();"
+      case _: StructType | _: ArrayType | _: MapType => s"$value = $initCode.copy();"
+      case _ => s"$value = $initCode;"
+    }
+    ExprCode(code, "false", value)
+  }
+
   def declareMutableStates(): String = {
     // It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
     // `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated
ones.

http://git-wip-us.apache.org/repos/asf/spark/blob/5ada6061/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 32f0bc5..fac6b8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -336,13 +336,7 @@ case class SortMergeJoinExec(
 
   private def copyKeys(ctx: CodegenContext, vars: Seq[ExprCode]): Seq[ExprCode] = {
     vars.zipWithIndex.map { case (ev, i) =>
-      val value = ctx.freshName("value")
-      ctx.addMutableState(ctx.javaType(leftKeys(i).dataType), value, "")
-      val code =
-        s"""
-           |$value = ${ev.value};
-         """.stripMargin
-      ExprCode(code, "false", value)
+      ctx.addBufferedState(leftKeys(i).dataType, "value", ev.value)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message