spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-16432] Empty blocks fail to serialize due to assert in ChunkedByteBuffer
Date Sat, 09 Jul 2016 03:18:53 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6cef0183c -> d8b06f18d


[SPARK-16432] Empty blocks fail to serialize due to assert in ChunkedByteBuffer

## What changes were proposed in this pull request?

It's possible to also change the callers to not pass in empty chunks, but it seems cleaner
to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen

## How was this patch tested?

Unit tests, also checked that the original reproduction case in https://github.com/apache/spark/pull/11748#issuecomment-230760283
is resolved.

Author: Eric Liang <ekl@databricks.com>

Closes #14099 from ericl/spark-16432.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8b06f18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8b06f18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8b06f18

Branch: refs/heads/master
Commit: d8b06f18dc3e35938d15099beac98221d6f528b5
Parents: 6cef018
Author: Eric Liang <ekl@databricks.com>
Authored: Fri Jul 8 20:18:49 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Fri Jul 8 20:18:49 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/io/ChunkedByteBuffer.scala    |  9 ++++-----
 .../org/apache/spark/io/ChunkedByteBufferSuite.scala    | 12 ++++--------
 2 files changed, 8 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8b06f18/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index fb4706e..89b0874 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -31,14 +31,13 @@ import org.apache.spark.storage.StorageUtils
  * Read-only byte buffer which is physically stored as multiple chunks rather than a single
  * contiguous array.
  *
- * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty
and have
- *               position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer,
- *               so if these buffers may also be used elsewhere then the caller is responsible
for
- *               copying them as needed.
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must have position
== 0.
+ *               Ownership of these buffers is transferred to the ChunkedByteBuffer, so if
these
+ *               buffers may also be used elsewhere then the caller is responsible for copying
+ *               them as needed.
  */
 private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   require(chunks != null, "chunks must not be null")
-  require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
   require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
 
   private[this] var disposed: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/d8b06f18/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index f205d4f..38b48a4 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -38,12 +38,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
     emptyChunkedByteBuffer.toInputStream(dispose = true).close()
   }
 
-  test("chunks must be non-empty") {
-    intercept[IllegalArgumentException] {
-      new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
-    }
-  }
-
   test("getChunks() duplicates chunks") {
     val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
     chunkedByteBuffer.getChunks().head.position(4)
@@ -63,8 +57,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
   }
 
   test("toArray()") {
+    val empty = ByteBuffer.wrap(Array[Byte]())
     val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
-    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes, empty))
     assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
   }
 
@@ -79,9 +74,10 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
   }
 
   test("toInputStream()") {
+    val empty = ByteBuffer.wrap(Array[Byte]())
     val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
     val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
-    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
     assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
 
     val inputStream = chunkedByteBuffer.toInputStream(dispose = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message