spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Problems with new experimental Kafka Consumer for 0.10
Date Fri, 07 Oct 2016 01:35:20 GMT
OK, so at this point, even without involving commitAsync, you're
seeing consumer rebalances after a particular batch takes longer than
the session timeout?

Do you have a minimal code example you can share?

On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
<matthias.niehoff@codecentric.de> wrote:
> Hi,
> sry for the late reply. A public holiday in Germany.
>
> Yes, its using a unique group id which no other job or consumer group is
> using. I have increased the session.timeout to 1 minutes and set the
> max.poll.rate to 1000. The processing takes ~1 second.
>
> 2016-09-29 4:41 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
>>
>> Well, I'd start at the first thing suggested by the error, namely that
>> the group has rebalanced.
>>
>> Is that stream using a unique group id?
>>
>> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
>> <matthias.niehoff@codecentric.de> wrote:
>> > Hi,
>> >
>> > the stacktrace:
>> >
>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> > be
>> > completed since the group has already rebalanced and assigned the
>> > partitions
>> > to another member. This means that the time between subsequent calls to
>> > poll() was longer than the configured session.timeout.ms, which
>> > typically
>> > implies that the poll loop is spending too much time message processing.
>> > You
>> > can address this either by increasing the session timeout or by reducing
>> > the
>> > maximum size of batches returned in poll() with max.poll.records.
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> > at scala.Option.orElse(Option.scala:289)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> > at scala.Option.orElse(Option.scala:289)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>> > at
>> >
>> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> > at
>> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>> > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
>> > at scala.util.Try$.apply(Try.scala:192)
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>> > at
>> >
>> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>> > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> >
>> > But it seems like the commit is not the actual problem. The job also
>> > falls
>> > behind if I do not commit the offsets. The delay would be ok if the
>> > processing time was bigger than the batch size, but thats not the case
>> > in
>> > any of the microbatches. Imho for some reason one of the microbatches
>> > falls
>> > behind more than session.timeout.ms. Then the consumer we regroup which
>> > takes about 1 minute (see timestamps below). Know begins a circle of
>> > slow
>> > batches each triggering a consumer regroup. Would this be possible?
>> >
>> >
>> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for time
>> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
>> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined group
>> > spark_aggregation_job-kafka010 with generation 6
>> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned
>> > partitions [sapxm.adserving.log.ad_request-0,
>> > sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1,
>> > sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3,
>> > sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5,
>> > sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7,
>> > sapxm.adserving.log.ad_request-9] for group
>> > spark_aggregation_job-kafka010
>> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned
>> > partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4,
>> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
>> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
>> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
>> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group
>> > spark_aggregation_job-kafka010
>> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
>> > spark_aggregation_job-kafka010
>> >
>> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <cody@koeninger.org>:
>> >>
>> >> What's the actual stacktrace / exception you're getting related to
>> >> commit failure?
>> >>
>> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
>> >> <matthias.niehoff@codecentric.de> wrote:
>> >> > Hi everybody,
>> >> >
>> >> > i am using the new Kafka Receiver for Spark Streaming for my Job.
>> >> > When
>> >> > running with old consumer it runs fine.
>> >> >
>> >> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the
>> >> > topic,
>> >> > calls mapWithState and stores the results in cassandra. After that
I
>> >> > manually commit the Kafka offsets using the commitAsync method of the
>> >> > KafkaDStream.
>> >> >
>> >> > With the new consumer I experience the following problem:
>> >> >
>> >> > After a certain amount of time (about 4-5 minutes, might be more or
>> >> > less)
>> >> > there are exceptions that the offset commit failed. The processing
>> >> > takes
>> >> > less than the batch interval. I also adjusted the session.timeout and
>> >> > request.timeout as well as the max.poll.records setting which did not
>> >> > help.
>> >> >
>> >> > After the first offset commit failed the time it takes from kafka
>> >> > until
>> >> > the
>> >> > microbatch is started increases, the processing time is constantly
>> >> > below
>> >> > the
>> >> > batch interval. Moreover further offset commits also fail and as
>> >> > result
>> >> > the
>> >> > delay time builds up.
>> >> >
>> >> > Has anybody made this experience as well?
>> >> >
>> >> > Thank you
>> >> >
>> >> > Relevant Kafka Parameters:
>> >> >
>> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
>> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
>> >> > "auto.offset.reset" -> "largest",
>> >> > "enable.auto.commit" -> "false",
>> >> > "max.poll.records" -> "1000"
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>> >> > Consulting
>> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
>> >> > (0)
>> >> > 172.1702676
>> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> >> > www.more4fi.de
>> >> >
>> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> >> > Schütz
>> >> >
>> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> >> > vertrauliche
>> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
>> >> > richtige
>> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> >> > informieren
>> >> > Sie
>> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>> >> > beigefügter
>> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
>> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
>> >> > nicht
>> >> > gestattet
>> >
>> >
>> >
>> >
>> > --
>> > Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> > 172.1702676
>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> > www.more4fi.de
>> >
>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> > Schütz
>> >
>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> > vertrauliche
>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
>> > Sie
>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>> > beigefügter
>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
>> > nicht
>> > gestattet
>
>
>
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter
> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht
> gestattet

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message