spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean shutdown of Arrow memory allocator
Date Mon, 28 May 2018 02:50:47 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 a06fc45ba -> 9b0f6f530


[SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean shutdown of Arrow memory
allocator

## What changes were proposed in this pull request?

There is a race condition of closing Arrow VectorSchemaRoot and Allocator in the writer thread
of ArrowPythonRunner.

The race results in memory leak exception when closing the allocator. This patch removes the
closing routine from the TaskCompletionListener and make the writer thread responsible for
cleaning up the Arrow memory.

This issue be reproduced by this test:

```
def test_memory_leak(self):
    from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode

   # Have all data in a single executor thread so it can trigger the race condition easier
    with self.sql_conf({'spark.sql.shuffle.partitions': 1}):
        df = self.spark.range(0, 1000)
        df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \
                   .withColumn('id', explode(col('id'))) \
                   .withColumn('v',  array([lit(i) for i in range(0, 1000)]))

       pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
       def foo(pdf):
           xxx
           return pdf

       result = df.groupby('id').apply(foo)

       with QuietTest(self.sc):
           with self.assertRaises(py4j.protocol.Py4JJavaError) as context:
               result.count()
           self.assertTrue('Memory leaked' not in str(context.exception))
```

Note: Because of the race condition, the test case cannot reproduce the issue reliably so
it's not added to test cases.

## How was this patch tested?

Because of the race condition, the bug cannot be unit test easily. So far it has only happens
on large amount of data. This is currently tested manually.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21397 from icexelloss/SPARK-24334-arrow-memory-leak.

(cherry picked from commit 672209f2909a95e891f3c779bfb2f0e534239851)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b0f6f53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b0f6f53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b0f6f53

Branch: refs/heads/branch-2.3
Commit: 9b0f6f530dcc86c0865744f0d51c9577b12aa216
Parents: a06fc45
Author: Li Jin <ice.xelloss@gmail.com>
Authored: Mon May 28 10:50:17 2018 +0800
Committer: hyukjinkwon <gurwls223@apache.org>
Committed: Mon May 28 10:50:42 2018 +0800

----------------------------------------------------------------------
 .../execution/python/ArrowPythonRunner.scala    | 29 ++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b0f6f53/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 5fcdcdd..01e19bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -70,19 +70,13 @@ class ArrowPythonRunner(
         val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
         val allocator = ArrowUtils.rootAllocator.newChildAllocator(
           s"stdout writer for $pythonExec", 0, Long.MaxValue)
-
         val root = VectorSchemaRoot.create(arrowSchema, allocator)
-        val arrowWriter = ArrowWriter.create(root)
-
-        context.addTaskCompletionListener { _ =>
-          root.close()
-          allocator.close()
-        }
-
-        val writer = new ArrowStreamWriter(root, null, dataOut)
-        writer.start()
 
         Utils.tryWithSafeFinally {
+          val arrowWriter = ArrowWriter.create(root)
+          val writer = new ArrowStreamWriter(root, null, dataOut)
+          writer.start()
+
           while (inputIterator.hasNext) {
             val nextBatch = inputIterator.next()
 
@@ -94,8 +88,21 @@ class ArrowPythonRunner(
             writer.writeBatch()
             arrowWriter.reset()
           }
-        } {
+          // end writes footer to the output stream and doesn't clean any resources.
+          // It could throw exception if the output stream is closed, so it should be
+          // in the try block.
           writer.end()
+        } {
+          // If we close root and allocator in TaskCompletionListener, there could be a race
+          // condition where the writer thread keeps writing to the VectorSchemaRoot while
+          // it's being closed by the TaskCompletion listener.
+          // Closing root and allocator here is cleaner because root and allocator is owned
+          // by the writer thread and is only visible to the writer thread.
+          //
+          // If the writer thread is interrupted by TaskCompletionListener, it should either
+          // (1) in the try block, in which case it will get an InterruptedException when
+          // performing io, and goes into the finally block or (2) in the finally block,
+          // in which case it will ignore the interruption and close the resources.
           root.close()
           allocator.close()
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message