spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "amit kumar (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-20671) Processing muitple kafka topics with single spark streaming context hangs on batchSubmitted.
Date Tue, 09 May 2017 05:53:04 GMT
amit kumar created SPARK-20671:
----------------------------------

             Summary: Processing muitple kafka topics with single spark streaming context
hangs on batchSubmitted.
                 Key: SPARK-20671
                 URL: https://issues.apache.org/jira/browse/SPARK-20671
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.0.0
         Environment: Ubuntu
            Reporter: amit kumar



object SparkMain extends App {
 System.setProperty("spark.cassandra.connection.host", "127.0.0.1")
 val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")
 val sc = new SparkContext(conf)
 val ssc = new StreamingContext(sc, Seconds(5))
 val sqlContext= new SQLContext(sc)
 val host = "localhost:2181"
 val topicList = List("test","fb")
 topicList.foreach{
   topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2);
     //configureStream(topic, lines)
     lines.foreachRDD(rdd => rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key")))
 }
  ssc.addStreamingListener(new StreamingListener {
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
{
     System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString
+ " ms")
   }
    override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
= {
     println("inside onReceiverStarted")
   }
    override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
     println("inside onReceiverError")
   }
    override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
= {
     println("inside onReceiverStopped")
   }
    override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit =
{
     println("inside onBatchSubmitted")
   }
    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
     println("inside onBatchStarted")
   }
 })
  ssc.start()
 println("===========================")
 ssc.awaitTermination()
}
case class test(key: String)


========
If i put any one of the topics at a time then each topic works.But when topic list has more
than one topic, after getting the DataStream from kafka topic, it keeps printing "inside onBatchSubmitted".
Thanks in advance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message