spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Malak <michaelma...@yahoo.com>
Subject Re: Kafka not shutting down cleanly; Actor serializtion?
Date Mon, 09 Dec 2013 17:37:31 GMT
I haven't seen a response to my September question and was wondering if anyone had any insights
into the problem I was having cleanly shutting down Kafka.

Note: For the purposes of the Apache 2.0 license, regarding the contents of this and my earlier
message: THIS IS NOT A CONTRIBUTION.


________________________________
 From: Michael Malak <michaelmalak@yahoo.com>
To: "dev@spark.incubator.apache.org" <dev@spark.incubator.apache.org> 
Sent: Thursday, September 26, 2013 12:27 PM
Subject: Kafka not shutting down cleanly; Actor serializtion?
 


Tathagata:


I don't believe Kafka streams are being shut down cleanly, which implies that the most recent
Kafka offsets are not being committed back to Zookeeper, which implies starting/restarting
a Spark Streaming process would result in duplicate events.

The simple Spark Streaming code (running in local mode) pasted below at the end of this e-mail,
which uses a hard-coded queueStream as its only input stream, exits cleanly when the presence
of the sentinel file is detected. However, if the queueStream is replaced with a kafkaStream,
the process never exits (unless I put a System.exit() as the very last line -- to forcibly
kill all threads).

In attempting to understand the Kafka shutdown process, I traced through the Spark Streaming
codebase with println()s. I noticed the following:

1. Although KafkaInputDStream.scala initializes the class member variable consumerConnector
in onStart(), I don't see a corresponding consumerConnector.shutdown() anywhere such as in
the onStop(). It is my understanding that it is the consumer shutdown() that commits the offsets
back to Zookeeper. See the Kafka example at https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#ConsumerGroupExample-FullSourceCode

2. There is a similar apparent asymmetry with executorPool, where it is not released in the
onStop(). (A further minor encumbrance is that it is a variable local to onStart() rather
than being a class member variable)

3. Through my println() tracing and Akka debug-level logging, I'm not seeing NetworkReceiverActor
ever receiving a StopReceiver message from ReceiverExecutor. From some poking around and testing,
it seems possible to successfully send any type of message to NetworkReceiverActor only prior
to that NetowrkReceiverActor being serialized into an RDD on line 146 of NetworkInputTracker.scala
https://github.com/mesos/spark/blob/branch-0.8/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala#L146

Prior to the actor being put into the RDD, messages can be sent to the actor, but not after
the actor is put into the RDD. Is it possible that Akka actors are intolerant of being serialized?

4. I noticed a lot of "TODO" comments sprinkled throughout the code relating to shutdown/termination/cleanup.

My biggest concern is #3 above, because if my suppositions are correct, then there might be
some major re-architecting involved. The other issues I could probably fix on my own and commit
back.


import spark.streaming._

object SimpleSparkStreaming {
  @volatile var receivedStop = false
  val sentinelFile = new java.io.File("/home/mmalak/stop")

  def main(args: Array[String]) {
    val Array(master, zkQuorum, broker, group, topics, numThreads) = args

    sentinelFile.delete

    val ssc =  new StreamingContext(master, "SimpleBeta", Seconds(1), System.getenv("SPARK_HOME"),
Seq("./target/scala-2.9.2/my.jar"))
    ssc.checkpoint("/home/mmalak/checkpointing")

    ssc.queueStream(new scala.collection.mutable.Queue[spark.RDD[Int]] += ssc.sparkContext.makeRDD(List(1))).foreach(rdd
=>
      println("receivedStop[" + receivedStop + "]"))
    ssc.start()

    while (!sentinelFile.exists) {Thread.sleep(1000)}
    println("Stop detected")
    receivedStop = true
    ssc.stop()
    println("Exiting main")
  }
}
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message