spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming
Date Wed, 30 Nov 2016 07:45:10 GMT
Repository: spark
Updated Branches:
  refs/heads/master 879ba7111 -> 56c82edab


[SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming

## What changes were proposed in this pull request?

#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**.
As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases.
It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the
first step. I will continue #15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes #16052 from uncleGen/SPARK-18617.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56c82eda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56c82eda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56c82eda

Branch: refs/heads/master
Commit: 56c82edabd62db9e936bb9afcf300faf8ef39362
Parents: 879ba71
Author: uncleGen <hustyugm@gmail.com>
Authored: Tue Nov 29 23:45:06 2016 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue Nov 29 23:45:06 2016 -0800

----------------------------------------------------------------------
 .../spark/serializer/SerializerManager.scala    | 16 ++++---
 .../spark/storage/memory/MemoryStore.scala      |  5 ++-
 .../storage/PartiallySerializedBlockSuite.scala |  6 ++-
 .../spark/streaming/StreamingContextSuite.scala | 47 ++++++++++++++++++++
 4 files changed, 65 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56c82eda/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index ef8432e..7371f88 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -79,8 +79,11 @@ private[spark] class SerializerManager(
     primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
   }
 
-  def getSerializer(ct: ClassTag[_]): Serializer = {
-    if (canUseKryo(ct)) {
+  // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The
worst
+  // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly.
It may be
+  // a rational choice to close `kryo auto pick` feature for streaming in the first step.
+  def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
+    if (autoPick && canUseKryo(ct)) {
       kryoSerializer
     } else {
       defaultSerializer
@@ -161,7 +164,8 @@ private[spark] class SerializerManager(
       outputStream: OutputStream,
       values: Iterator[T]): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
-    val ser = getSerializer(implicitly[ClassTag[T]]).newInstance()
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
     ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
   }
 
@@ -177,7 +181,8 @@ private[spark] class SerializerManager(
       classTag: ClassTag[_]): ChunkedByteBuffer = {
     val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
     val byteStream = new BufferedOutputStream(bbos)
-    val ser = getSerializer(classTag).newInstance()
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val ser = getSerializer(classTag, autoPick).newInstance()
     ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
     bbos.toChunkedByteBuffer
   }
@@ -191,7 +196,8 @@ private[spark] class SerializerManager(
       inputStream: InputStream)
       (classTag: ClassTag[T]): Iterator[T] = {
     val stream = new BufferedInputStream(inputStream)
-    getSerializer(classTag)
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    getSerializer(classTag, autoPick)
       .newInstance()
       .deserializeStream(wrapStream(blockId, stream))
       .asIterator.asInstanceOf[Iterator[T]]

http://git-wip-us.apache.org/repos/asf/spark/blob/56c82eda/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 095d324..fff2121 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
-import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
+import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
@@ -334,7 +334,8 @@ private[spark] class MemoryStore(
     val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
     redirectableStream.setOutputStream(bbos)
     val serializationStream: SerializationStream = {
-      val ser = serializerManager.getSerializer(classTag).newInstance()
+      val autoPick = !blockId.isInstanceOf[StreamBlockId]
+      val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
       ser.serializeStream(serializerManager.wrapStream(blockId, redirectableStream))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56c82eda/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
index ec4f263..3050f9a 100644
--- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
@@ -67,7 +67,8 @@ class PartiallySerializedBlockSuite
       spy
     }
 
-    val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+    val serializer = serializerManager
+      .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
     val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream)
     redirectableOutputStream.setOutputStream(bbos)
     val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream))
@@ -182,7 +183,8 @@ class PartiallySerializedBlockSuite
       Mockito.verifyNoMoreInteractions(memoryStore)
       Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer, atLeastOnce).dispose()
 
-      val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+      val serializer = serializerManager
+        .getSerializer(implicitly[ClassTag[T]], autoPick = true).newInstance()
       val deserialized =
         serializer.deserializeStream(new ByteBufferInputStream(bbos.toByteBuffer)).asIterator.toSeq
       assert(deserialized === items)

http://git-wip-us.apache.org/repos/asf/spark/blob/56c82eda/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f1482e5..45d8f50 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter
with Timeo
     ssc.stop()
   }
 
+  test("SPARK-18560 Receiver data should be deserialized properly.") {
+    // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use
the
+    // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
+    val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
+    ssc = new StreamingContext(conf, Milliseconds(100))
+    val input = ssc.receiverStream(new FakeByteArrayReceiver)
+    input.count().foreachRDD { rdd =>
+      // Make sure we can read from BlockRDD
+      if (rdd.collect().headOption.getOrElse(0L) > 0) {
+        // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+        new Thread() {
+          setDaemon(true)
+          override def run(): Unit = {
+            ssc.stop(stopSparkContext = true, stopGracefully = false)
+          }
+        }.start()
+      }
+    }
+    ssc.start()
+    ssc.awaitTerminationOrTimeout(60000)
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => 1 to i)
     val inputStream = new TestInputStream(s, input, 1)
@@ -869,6 +891,31 @@ object TestReceiver {
   val counter = new AtomicInteger(1)
 }
 
+class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with
Logging {
+
+  val data: Array[Byte] = "test".getBytes
+  var receivingThreadOption: Option[Thread] = None
+
+  override def onStart(): Unit = {
+    val thread = new Thread() {
+      override def run() {
+        logInfo("Receiving started")
+        while (!isStopped) {
+          store(data)
+        }
+        logInfo("Receiving stopped")
+      }
+    }
+    receivingThreadOption = Some(thread)
+    thread.start()
+  }
+
+  override def onStop(): Unit = {
+    // no clean to be done, the receiving thread should stop on it own, so just wait for
it.
+    receivingThreadOption.foreach(_.join())
+  }
+}
+
 /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not
*/
 class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
   extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message