spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nicholas Verbeck (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions
Date Tue, 10 Apr 2018 22:41:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433100#comment-16433100
] 

Nicholas Verbeck commented on SPARK-19680:
------------------------------------------

KAFKA-3370 is a good solution to the bad preforming jobs problem from a central point.

However irregardless of that, Spark shouldn't just dictate functionality to users like that.
It should instead leave it up to the user to assume responsibility if they wish to enable
that setting. Making notes and comments within sparks docs of the potential issues until either
Kafka and/or Spark can come up with a solution that isn't the removal of functionality. 

As a side solution, until Kafka can be fixed, would be for Spark to eval the setting itself.
If set go about looking up the current offsets at start and handling moving them to the latest/earliest
as requested. Then switching to NONE for the continued run. This would prevent the issues
that you appear to be wanting to prevent. While letting the users maintain, somewhat the key
part of the functionality they are looking for. 

 

 

> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
>                 Key: SPARK-19680
>                 URL: https://issues.apache.org/jira/browse/SPARK-19680
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Schakmann Rene
>            Priority: Major
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to read all
the messages in kafka. So I set
>    "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I get:
> Error:
> {code:title=error.log|borderStyle=solid}
> 	Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent
failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions: {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, Object] =
{
>     Map(
>       "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"),
>       "group.id" -> properties.getProperty("kafka.consumer.group"),
>       "auto.offset.reset" -> "earliest",
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "enable.auto.commit" -> "false",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, Array[Byte]]],
windowDuration: Int, slideDuration: Int, kafkaSink: Broadcast[KafkaSink[TopList]]): Unit =
{
>     getFilteredStream(stream.map(_.value()), windowDuration, slideDuration).foreachRDD(rdd
=> {
>       val topList = new TopList
>       topList.setCreated(new Date())
>       topList.setTopListEntryList(rdd.take(TopListLength).toList)
>       CurrentLogger.info("TopList length: " + topList.getTopListEntryList.size().toString)
>       kafkaSink.value.send(SendToTopicName, topList)
>       CurrentLogger.info("Last Run: " + System.currentTimeMillis())
>     })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, slideDuration:
Int): DStream[TopListEntry] = {
>     val Mapper = MapperObject.readerFor[SearchEventDTO]
>     result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>       .filter(s => s != null && s.getSearchRequest != null && s.getSearchRequest.getSearchParameters
!= null && s.getVertical == Vertical.BAP && s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>       .map(row => {
>         val name = row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
>         (name, new TopListEntry(name, 1, row.getResultCount))
>       })
>       .reduceByKeyAndWindow(
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount
+ b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits),
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount
- b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits),
>         Minutes(windowDuration),
>         Seconds(slideDuration))
>       .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>       .map(row => (row._2.getSearchCount, row._2))
>       .transform(rdd => rdd.sortByKey(ascending = false))
>       .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, row._2.getMeanSearchHits
/ row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
>     val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
>     val kafkaSink = sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
>     val kafkaParams: Map[String, Object] = SparkUtil.getDefaultKafkaReceiverParameter(properties)
>     val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
>     ssc.checkpoint("/home/spark/checkpoints")
>     val adEventStream =
>       KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, Subscribe[String,
Array[Byte]](Array(ReadFromTopicName), kafkaParams))
>     processSearchKeyWords(adEventStream, SparkUtil.getWindowDuration(properties), SparkUtil.getSlideDuration(properties),
kafkaSink)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
>     logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
>     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed because
the offset is not valid anymore due to retention policy the streaming job will stop working




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message