spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable
Date Thu, 01 Jan 2015 00:03:11 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 14dbd8312 -> 434ea009c

[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle
bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot
be serialized, then this seems to lead to more-or-less silent failures. As an optimization,
Akka skips message serialization for messages sent within the same JVM. As a result, Spark's
unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent
failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage

Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`,
not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver
shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If
you want to reproduce this, try running the word count example from the Streaming Programming
Guide in the Spark shell:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
ssc.stop(true, true)

Prior to this patch, this would work correctly in local mode but fail when running against
a real cluster (it would report that some receivers were not shut down).

Author: Josh Rosen <>

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable.

(cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e)
Signed-off-by: Tathagata Das <>


Branch: refs/heads/branch-1.2
Commit: 434ea009cd7efb2c29e88a889e87f501647a7fa6
Parents: 14dbd83
Author: Josh Rosen <>
Authored: Wed Dec 31 16:02:47 2014 -0800
Committer: Tathagata Das <>
Committed: Wed Dec 31 16:03:03 2014 -0800

 .../org/apache/spark/streaming/receiver/ReceiverMessage.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index bf39d1e..ab9fa19 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
 package org.apache.spark.streaming.receiver
 /** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait ReceiverMessage
+private[streaming] sealed trait ReceiverMessage extends Serializable
 private[streaming] object StopReceiver extends ReceiverMessage

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message