spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "zhaoshijie (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-23125) Offset commit failed when spark-streaming batch time is more than kafkaParams session timeout.
Date Wed, 17 Jan 2018 10:13:03 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

zhaoshijie updated SPARK-23125:
-------------------------------
    Description: 
I find DirectKafkaInputDStream(kafka010) Offset commit failed when batch time is more than kafkaParams session.timeout.ms
.log as fellow:
{code:java}
2018-01-16 05:40:00,002 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
Offset commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured session.timeout.ms,
which typically implies that the poll loop is spending too much time message processing. You
can address this either by increasing the session timeout or by reducing the maximum size
of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:161)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:180)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:207)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}
 

Although I think it is a kafka issue,but I find kafka version 0.10.0.1 can not solve this
issue. this issue happend bacase of kafka client rebalanced,kafka client rebalanced is control
by param session.timeout.ms, but kafka Official documents are shown as follows:

 "Note that the value must be in the allowable range as configured in the broker configuration
by {{group.min.session.timeout.ms}} and {{group.max.session.timeout.ms}}."

so, if I want to commit kafka offset successed , I must guarantee my batch time is smaller
than {{group.max.session.timeout.ms(default 300000ms)}}. it is unreasonable.

I think we shoud update streaming kafka from 0.10.0.1 to 0.10.2.0.

 

  was:
I find DirectKafkaInputDStream(kafka010) Offset commit failed when batch time is more than kafkaParams session.timeout.ms
.log as fellow:
{code:java}
2018-01-16 05:40:00,002 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
Offset commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured session.timeout.ms,
which typically implies that the poll loop is spending too much time message processing. You
can address this either by increasing the session timeout or by reducing the maximum size
of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:161)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:180)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:207)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}
 

Although I think it is a kafka issue,but I find kafka version 0.10.0.1 can not solve this
issue. this issue happend bacase of kafka client rebalanced,kafka client rebalanced is control
by param session.timeout.ms, but kafka Official documents are shown as follows:

 "Note that the value must be in the allowable range as configured in the broker configuration
by {{group.min.session.timeout.ms}} and {{group.max.session.timeout.ms}}."

so, if I want to commit kafka offset successed , I must guarantee my batch time is smaller
than {{group.max.session.timeout.ms(default 300000ms)}}. it is unreasonable.

I think we shoud update streaming kafka from 10.0.1 to 10.2.0.

 


> Offset commit failed when spark-streaming batch time is more than kafkaParams session
timeout.
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23125
>                 URL: https://issues.apache.org/jira/browse/SPARK-23125
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: zhaoshijie
>            Priority: Major
>
> I find DirectKafkaInputDStream(kafka010) Offset commit failed when batch time is more
than kafkaParams session.timeout.ms .log as fellow:
> {code:java}
> 2018-01-16 05:40:00,002 ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
Offset commit failed.
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured session.timeout.ms,
which typically implies that the poll loop is spending too much time message processing. You
can address this either by increasing the session timeout or by reducing the maximum size
of batches returned in poll() with max.poll.records.
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> 	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:161)
> 	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:180)
> 	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:207)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.map(List.scala:285)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
> 	at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
>  
> Although I think it is a kafka issue,but I find kafka version 0.10.0.1 can not solve
this issue. this issue happend bacase of kafka client rebalanced,kafka client rebalanced is
control by param session.timeout.ms, but kafka Official documents are shown as follows:
>  "Note that the value must be in the allowable range as configured in the broker configuration
by {{group.min.session.timeout.ms}} and {{group.max.session.timeout.ms}}."
> so, if I want to commit kafka offset successed , I must guarantee my batch time is smaller
than {{group.max.session.timeout.ms(default 300000ms)}}. it is unreasonable.
> I think we shoud update streaming kafka from 0.10.0.1 to 0.10.2.0.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message