kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Justin Miller (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4396) Seeing offsets not resetting even when reset policy is configured explicitly
Date Thu, 10 Nov 2016 06:09:58 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653148#comment-15653148
] 

Justin Miller commented on KAFKA-4396:
--------------------------------------

Hi huxi, thanks for responding.

I do have that set to false as I'm doing a

{code}
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
{code}

with a custom OffsetCommitCallback to verify that I'm getting the offsets committed with no
exception. I haven't tried running it on a new consumer group but I can try that tomorrow
(though I would note that the problem only seems to manifest itself after it's processed a
number of time batches. I do save all the parquet files that are generated for the time batch
before I commit the offsets, this process can take up to 8 minutes. Should I perhaps just
commit the offsets and deal with a potential data loss if retried puts to S3 fail?

Getting really close to putting this system in production. I've tweaked quite a few settings
on the kafka consumer (can provide ConsumerConfigs if that would help), Streaming Kafka 0.10
has been very impressive so far!



> Seeing offsets not resetting even when reset policy is configured explicitly
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-4396
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4396
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Justin Miller
>
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two separate
errors, I'm not sure. What's puzzling is that I'm setting auto.offset.reset to latest and
it's still throwing an OffsetOutOfRangeException, behavior that's contrary to the code. Please
help! :)
> {code}
> val kafkaParams = Map[String, Object](
>       "group.id" -> consumerGroup,
>       "bootstrap.servers" -> bootstrapServers,
>       "key.deserializer" -> classOf[ByteArrayDeserializer],
>       "value.deserializer" -> classOf[MessageRowDeserializer],
>       "auto.offset.reset" -> "latest",
>       "enable.auto.commit" -> (false: java.lang.Boolean),
>       "max.poll.records" -> persisterConfig.maxPollRecords,
>       "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>       "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>       "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>       "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
>     )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory on xyz
(size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 38837, xyz):
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no
configured reset policy for partitions: {topic=231884473}
>         at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
>         at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>         at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
>         at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
>         at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>         at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 39388)
in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 39375) in
13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, xyz):
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
>         at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
>         at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>         at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
>         at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message