spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Malak <>
Subject Kafka not shutting down cleanly; Actor serializtion?
Date Thu, 26 Sep 2013 18:27:21 GMT

I don't believe Kafka streams are being shut down cleanly, which implies that the most recent
Kafka offsets are not being committed back to Zookeeper, which implies starting/restarting
a Spark Streaming process would result in duplicate events.

The simple Spark Streaming code (running in local mode) pasted below at the end of this e-mail,
which uses a hard-coded queueStream as its only input stream, exits cleanly when the presence
of the sentinel file is detected. However, if the queueStream is replaced with a kafkaStream,
the process never exits (unless I put a System.exit() as the very last line -- to forcibly
kill all threads).

In attempting to understand the Kafka shutdown process, I traced through the Spark Streaming
codebase with println()s. I noticed the following:

1. Although KafkaInputDStream.scala initializes the class member variable consumerConnector
in onStart(), I don't see a corresponding consumerConnector.shutdown() anywhere such as in
the onStop(). It is my understanding that it is the consumer shutdown() that commits the offsets
back to Zookeeper. See the Kafka example at

2. There is a similar apparent asymmetry with executorPool, where it is not released in the
onStop(). (A further minor encumbrance is that it is a variable local to onStart() rather
than being a class member variable)

3. Through my println() tracing and Akka debug-level logging, I'm not seeing NetworkReceiverActor
ever receiving a StopReceiver message from ReceiverExecutor. From some poking around and testing,
it seems possible to successfully send any type of message to NetworkReceiverActor only prior
to that NetowrkReceiverActor being serialized into an RDD on line 146 of NetworkInputTracker.scala

Prior to the actor being put into the RDD, messages can be sent to the actor, but not after
the actor is put into the RDD. Is it possible that Akka actors are intolerant of being serialized?

4. I noticed a lot of "TODO" comments sprinkled throughout the code relating to shutdown/termination/cleanup.

My biggest concern is #3 above, because if my suppositions are correct, then there might be
some major re-architecting involved. The other issues I could probably fix on my own and commit

import spark.streaming._

object SimpleSparkStreaming {
  @volatile var receivedStop = false
  val sentinelFile = new"/home/mmalak/stop")

  def main(args: Array[String]) {
    val Array(master, zkQuorum, broker, group, topics, numThreads) = args


    val ssc =  new StreamingContext(master, "SimpleBeta", Seconds(1), System.getenv("SPARK_HOME"),

    ssc.queueStream(new scala.collection.mutable.Queue[spark.RDD[Int]] += ssc.sparkContext.makeRDD(List(1))).foreach(rdd
      println("receivedStop[" + receivedStop + "]"))

    while (!sentinelFile.exists) {Thread.sleep(1000)}
    println("Stop detected")
    receivedStop = true
    println("Exiting main")

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message