spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-21561) spark-streaming-kafka-010 DSteam is not pulling anything from Kafka
Date Fri, 28 Jul 2017 18:13:03 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen resolved SPARK-21561.
-------------------------------
    Resolution: Invalid

This isn't a place to ask for input on your code -- you'd have to show a reproducible bug
here that you've narrowed down

> spark-streaming-kafka-010 DSteam is not pulling anything from Kafka
> -------------------------------------------------------------------
>
>                 Key: SPARK-21561
>                 URL: https://issues.apache.org/jira/browse/SPARK-21561
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.1
>            Reporter: Vlad Badelita
>              Labels: kafka-0.10, spark-streaming
>
> I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka topic(broker
version 0.10). I have checked that messages are being produced and used a KafkaConsumer to
pull them successfully. Now, when I try to use the spark streaming api, I am not getting anything.
If I just use KafkaUtils.createRDD and specify some offset ranges manually it works. But when,
I try to use createDirectStream, all the rdds are empty and when I check the partition offsets
it simply reports that all partitions are 0. Here is what I tried:
> {code:scala}
>  val sparkConf = new SparkConf().setAppName("kafkastream")
>  val ssc = new StreamingContext(sparkConf, Seconds(3))
>  val topics = Array("my_topic")
>  val kafkaParams = Map[String, Object](
>    "bootstrap.servers" -> "hostname:6667"
>    "key.deserializer" -> classOf[StringDeserializer],
>    "value.deserializer" -> classOf[StringDeserializer],
>    "group.id" -> "my_group",
>    "auto.offset.reset" -> "earliest",
>    "enable.auto.commit" -> (true: java.lang.Boolean)
>  )
>  val stream = KafkaUtils.createDirectStream[String, String](
>    ssc,
>    PreferConsistent,
>    Subscribe[String, String](topics, kafkaParams)
>  )
>  stream.foreachRDD { rdd =>
>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>    rdd.foreachPartition { iter =>
>      val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>      println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
>    }
>    val rddCount = rdd.count()
>    println("rdd count: ", rddCount)
>    // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>  }
>  ssc.start()
>  ssc.awaitTermination()
> {code}
> All partitions show offset ranges from 0 to 0 and all rdds are empty. I would like it
to start from the beginning of a partition but also pick up everything that is being produced
to it.
> I have also tried using spark-streaming-kafka-0.8 and it does work. I think it is a 0.10
issue because everything else works fine. Thank you!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message