spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jatin Kumar <jku...@rocketfuelinc.com.INVALID>
Subject Spark streaming from Kafka best fit
Date Tue, 01 Mar 2016 08:36:48 GMT
Hello all,

I see that there are as of today 3 ways one can read from Kafka in spark
streaming:
1. KafkaUtils.createStream() (here
<https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
2. KafkaUtils.createDirectStream() (here
<https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
3. Kafka-spark-consumer (here
<https://github.com/dibbhatt/kafka-spark-consumer>)

My spark streaming application has to read from 1 kafka topic with around
224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
filtering I need to maintain top 10000 URL counts. I don't really care
about exactly once semantics as I am interested in rough estimate.

Code:

sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
sparkConf.setAppName("KafkaReader")
val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String](
  "metadata.broker.list" -> "kafka.server.ip:9092",
  "group.id" -> consumer_group
)

val lineStreams = (1 to N).map{ _ =>
  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
}

ssc.union(
  lineStreams.map(stream => {
  stream.map(ParseStringToLogRecord)
    .filter(record => isGoodRecord(record))
    .map(record => record.url)
  })
).window(Seconds(120), Seconds(120))  // 2 Minute window
  .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute
moving window, 28 will probably help in parallelism
  .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
  .mapPartitions(iter => {
    iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) <
0).slice(0, 1000).iterator
  }, true)
  .foreachRDD((latestRDD, rddTime) => {
      printTopFromRDD(rddTime, latestRDD.map(record => (record._2,
record._1)).sortByKey(false).take(1000))
  })

ssc.start()
ssc.awaitTermination()

Questions:

a) I used #2 but I found that I couldn't control how many executors will be
actually fetching from Kafka. How do I keep a balance of executors which
receive data from Kafka and which process data? Do they keep changing for
every batch?

b) Now I am trying to use #1 creating multiple DStreams, filtering them and
then doing a union. I don't understand why would the number of events
processed per 120 seconds batch will change drastically. PFA the events/sec
graph while running with 1 receiver. How to debug this?

c) What will be the most suitable method to integrate with Kafka from above
3? Any recommendations for getting maximum performance, running the
streaming application reliably in production environment?

--
Thanks
Jatin Kumar

Mime
View raw message