spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gabor Somogyi (Jira)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling
Date Thu, 16 Jul 2020 16:03:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159322#comment-17159322
] 

Gabor Somogyi commented on SPARK-32151:
---------------------------------------

I see, this case Structured Streaming is not affected.

> Kafka does not allow Partition Rebalance Handling
> -------------------------------------------------
>
>                 Key: SPARK-32151
>                 URL: https://issues.apache.org/jira/browse/SPARK-32151
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.4.5
>            Reporter: Ed Mitchell
>            Priority: Minor
>
> When a consumer group rebalance occurs when the Spark driver is using the Subscribe or
Subscribe Pattern ConsumerStrategy, driver's offsets are cleared when partitions are revoked
and then reassigned.
> While this doesn't happen in the normal rebalance scenario of more consumers joining
the group (though it could), it does happen when the partition leader is reelected because
of a Kafka node being stopped or decommissioned.
> This seems to only occur when users specify their own offsets and do not use Kafka as
the persistent store of offsets (they use their own database, and possibly if using checkpointing).
> This could probably affect Structured Streaming.
> This presents itself as an "NoOffsetForPartitionException":
> {code:java}
> 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time 1589333820000 ms
> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with
no reset policy for partitions: [production-ad-metrics-1, production-ad-metrics-2, production-ad-metrics-0,
production-ad-metrics-5, production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4,
production-ad-metrics-7]
>   at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391)
>   at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185)
>   at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222)
>   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>   at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172)
>   at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191)
>   at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>   at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>   at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>   at scala.Option.orElse(Option.scala:289)
>   at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>   at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>   at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>   at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>   at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>   at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> {code}
> This can be fixed by allowing the user to specify an
> {code:java}
> org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code}
> in the KafkaConsumer#subscribe method.
> The documentation for ConsumerRebalanceListener states that you can use KafkaConsumer#seek
with fetched offsets 
> I'm suggesting adding a new ConsumerStrategy that allows users to specify a function
to fetch offsets with a Collection of TopicPartitions. The reason for this is to keep the
Spark user from having to interact with the Kafka API directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message