spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9827] [SQL] fix fd leak in UnsafeRowSerializer
Date Thu, 13 Aug 2015 03:02:59 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7b13ed27c -> 7c35746c9


[SPARK-9827] [SQL] fix fd leak in UnsafeRowSerializer

Currently, UnsafeRowSerializer does not close the InputStream, will cause fd leak if the InputStream
has an open fd in it.

TODO: the fd could still be leaked, if any items in the stream is not consumed. Currently
it replies on GC to close the fd in this case.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #8116 from davies/fd_leak.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c35746c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c35746c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c35746c

Branch: refs/heads/master
Commit: 7c35746c916cf0019367850e75a080d7e739dba0
Parents: 7b13ed2
Author: Davies Liu <davies@databricks.com>
Authored: Wed Aug 12 20:02:55 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Aug 12 20:02:55 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/UnsafeRowSerializer.scala     |  2 ++
 .../execution/UnsafeRowSerializerSuite.scala    | 31 ++++++++++++++++++--
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c35746c/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 3860c4b..5c18558 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -108,6 +108,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
       override def asKeyValueIterator: Iterator[(Int, UnsafeRow)] = {
         new Iterator[(Int, UnsafeRow)] {
           private[this] var rowSize: Int = dIn.readInt()
+          if (rowSize == EOF) dIn.close()
 
           override def hasNext: Boolean = rowSize != EOF
 
@@ -119,6 +120,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
             row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, numFields, rowSize)
             rowSize = dIn.readInt() // read the next row's size
             if (rowSize == EOF) { // We are returning the last row in this stream
+              dIn.close()
               val _rowTuple = rowTuple
               // Null these out so that the byte array can be garbage collected once the
entire
               // iterator has been consumed

http://git-wip-us.apache.org/repos/asf/spark/blob/7c35746c/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 40b47ae..bd02c73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.Row
@@ -25,6 +25,18 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.types._
 
+
+/**
+ * used to test close InputStream in UnsafeRowSerializer
+ */
+class ClosableByteArrayInputStream(buf: Array[Byte]) extends ByteArrayInputStream(buf) {
+  var closed: Boolean = false
+  override def close(): Unit = {
+    closed = true
+    super.close()
+  }
+}
+
 class UnsafeRowSerializerSuite extends SparkFunSuite {
 
   private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
@@ -52,8 +64,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
       serializerStream.writeValue(unsafeRow)
     }
     serializerStream.close()
-    val deserializerIter = serializer.deserializeStream(
-      new ByteArrayInputStream(baos.toByteArray)).asKeyValueIterator
+    val input = new ClosableByteArrayInputStream(baos.toByteArray)
+    val deserializerIter = serializer.deserializeStream(input).asKeyValueIterator
     for (expectedRow <- unsafeRows) {
       val actualRow = deserializerIter.next().asInstanceOf[(Integer, UnsafeRow)]._2
       assert(expectedRow.getSizeInBytes === actualRow.getSizeInBytes)
@@ -61,5 +73,18 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
       assert(expectedRow.getInt(1) === actualRow.getInt(1))
     }
     assert(!deserializerIter.hasNext)
+    assert(input.closed)
+  }
+
+  test("close empty input stream") {
+    val baos = new ByteArrayOutputStream()
+    val dout = new DataOutputStream(baos)
+    dout.writeInt(-1)  // EOF
+    dout.flush()
+    val input = new ClosableByteArrayInputStream(baos.toByteArray)
+    val serializer = new UnsafeRowSerializer(numFields = 2).newInstance()
+    val deserializerIter = serializer.deserializeStream(input).asKeyValueIterator
+    assert(!deserializerIter.hasNext)
+    assert(input.closed)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message