Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6509200C36 for ; Fri, 10 Mar 2017 22:33:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D4E40160B82; Fri, 10 Mar 2017 21:33:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 02D81160B79 for ; Fri, 10 Mar 2017 22:33:08 +0100 (CET) Received: (qmail 81839 invoked by uid 500); 10 Mar 2017 21:33:08 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 81830 invoked by uid 99); 10 Mar 2017 21:33:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Mar 2017 21:33:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C29801A02B8 for ; Fri, 10 Mar 2017 21:33:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id o5LLzcMmrMty for ; Fri, 10 Mar 2017 21:33:05 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 449935F295 for ; Fri, 10 Mar 2017 21:33:05 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A33ADE06B9 for ; Fri, 10 Mar 2017 21:33:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 55A42243A8 for ; Fri, 10 Mar 2017 21:33:04 +0000 (UTC) Date: Fri, 10 Mar 2017 21:33:04 +0000 (UTC) From: "Cody Koeninger (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (SPARK-19888) Seeing offsets not resetting even when reset policy is configured explicitly MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 10 Mar 2017 21:33:10 -0000 [ https://issues.apache.org/jira/browse/SPARK-19888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905730#comment-15905730 ] Cody Koeninger commented on SPARK-19888: ---------------------------------------- That stacktrace also shows a concurrent modification exception, yes?. See SPARK-19185 for that See e.g. SPARK-19680 for background on why offset out of range may occur on executor when it doesn't on driver. Although if you're using reset latest, unless you have really short retention this is kind of surprising. > Seeing offsets not resetting even when reset policy is configured explicitly > ---------------------------------------------------------------------------- > > Key: SPARK-19888 > URL: https://issues.apache.org/jira/browse/SPARK-19888 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.1.0 > Reporter: Justin Miller > > I was told to post this in a Spark ticket from KAFKA-4396: > I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two separate errors, I'm not sure. What's puzzling is that I'm setting auto.offset.reset to latest and it's still throwing an OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :) > {code} > val kafkaParams = Map[String, Object]( > "group.id" -> consumerGroup, > "bootstrap.servers" -> bootstrapServers, > "key.deserializer" -> classOf[ByteArrayDeserializer], > "value.deserializer" -> classOf[MessageRowDeserializer], > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> (false: java.lang.Boolean), > "max.poll.records" -> persisterConfig.maxPollRecords, > "request.timeout.ms" -> persisterConfig.requestTimeoutMs, > "session.timeout.ms" -> persisterConfig.sessionTimeoutMs, > "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs, > "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs > ) > {code} > {code} > 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory on xyz (size: 146.3 KB, free: 8.4 GB) > 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic=231884473} > at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) > at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70) > at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) > at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 39388) in 12043 ms on xyz (1/16) > 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 39375) in 13444 ms on xyz (2/16) > 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, xyz): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access > at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) > at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org