spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "amit kumar (JIRA)" <>
Subject [jira] [Created] (SPARK-20671) Processing muitple kafka topics with single spark streaming context hangs on batchSubmitted.
Date Tue, 09 May 2017 05:53:04 GMT
amit kumar created SPARK-20671:

             Summary: Processing muitple kafka topics with single spark streaming context
hangs on batchSubmitted.
                 Key: SPARK-20671
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.0.0
         Environment: Ubuntu
            Reporter: amit kumar

object SparkMain extends App {
 System.setProperty("", "")
 val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")
 val sc = new SparkContext(conf)
 val ssc = new StreamingContext(sc, Seconds(5))
 val sqlContext= new SQLContext(sc)
 val host = "localhost:2181"
 val topicList = List("test","fb")
   topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2);
     //configureStream(topic, lines)
     lines.foreachRDD(rdd =>"test","rawdata",SomeColumns("key")))
  ssc.addStreamingListener(new StreamingListener {
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
     System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString
+ " ms")
    override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
= {
     println("inside onReceiverStarted")
    override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
     println("inside onReceiverError")
    override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
= {
     println("inside onReceiverStopped")
    override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit =
     println("inside onBatchSubmitted")
    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
     println("inside onBatchStarted")
case class test(key: String)

If i put any one of the topics at a time then each topic works.But when topic list has more
than one topic, after getting the DataStream from kafka topic, it keeps printing "inside onBatchSubmitted".
Thanks in advance.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message