spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-21743][SQL] top-most limit should not cause memory leak
Date Thu, 17 Aug 2017 05:37:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master b8ffb5105 -> a45133b82


[SPARK-21743][SQL] top-most limit should not cause memory leak

## What changes were proposed in this pull request?

For top-most limit, we will use a special operator to execute it: `CollectLimitExec`.

`CollectLimitExec` will retrieve `n`(which is the limit) rows from each partition of the child
plan output, see https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L311.
It's very likely that we don't exhaust the child plan output.

This is fine when whole-stage-codegen is off, as child plan will release the resource via
task completion listener. However, when whole-stage codegen is on, the resource can only be
released if all output is consumed.

To fix this memory leak, one simple approach is, when `CollectLimitExec` retrieve `n` rows
from child plan output, child plan output should only have `n` rows, then the output is exhausted
and resource is released. This can be done by wrapping child plan with `LocalLimit`

## How was this patch tested?

a regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18955 from cloud-fan/leak.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a45133b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a45133b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a45133b8

Branch: refs/heads/master
Commit: a45133b826984b7856e16d754e01c82702016af7
Parents: b8ffb51
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Wed Aug 16 22:37:45 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Wed Aug 16 22:37:45 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/SparkStrategies.scala     | 7 ++++++-
 .../main/scala/org/apache/spark/sql/execution/limit.scala    | 8 ++++++++
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala  | 5 +++++
 3 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a45133b8/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 691f71a..2e8ce45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           execution.TakeOrderedAndProjectExec(
             limit, order, projectList, planLater(child)) :: Nil
         case logical.Limit(IntegerLiteral(limit), child) =>
-          execution.CollectLimitExec(limit, planLater(child)) :: Nil
+          // Normally wrapping child with `LocalLimitExec` here is a no-op, because
+          // `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
+          // calls `child.executeTake`. If child supports whole stage codegen, adding this
+          // `LocalLimitExec` can stop the processing of whole stage codegen and trigger
the
+          // resource releasing work, after we consume `limit` rows.
+          execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
         case other => planLater(other) :: Nil
       }
       case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a45133b8/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 73a0f87..7cef556 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -54,6 +54,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
   val limit: Int
   override def output: Seq[Attribute] = child.output
 
+  // Do not enable whole stage codegen for a single limit.
+  override def supportCodegen: Boolean = child match {
+    case plan: CodegenSupport => plan.supportCodegen
+    case _ => false
+  }
+
+  override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))
+
   protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions {
iter =>
     iter.take(limit)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a45133b8/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e95f6db..923c6d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"),
Row(1))
     }
   }
+
+  test("SPARK-21743: top-most limit should not cause memory leak") {
+    // In unit test, Spark will fail the query if memory leak detected.
+    spark.range(100).groupBy("id").count().limit(1).collect()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message