spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "kaushik srinivas (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-19185) ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
Date Tue, 20 Mar 2018 06:55:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405887#comment-16405887
] 

kaushik srinivas commented on SPARK-19185:
------------------------------------------

same issue found with kafka spark streaming 010.

With increased batch window, this issue seems to be more frequently appearing.

SPARK-23663 created for reference. Is disabling window the workaround as of now ?

We see in spark 2.2.0

> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -------------------------------------------------------------------------
>
>                 Key: SPARK-19185
>                 URL: https://issues.apache.org/jira/browse/SPARK-19185
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>            Reporter: Kalvin Chau
>            Priority: Major
>              Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is not safe for
multi-threaded access" with the CachedKafkaConsumer. I've been working through debugging this
issue and after looking through some of the spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker threads assigned
the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread went first
would seek and poll for all the records, and at the same time the second thread would try
to seek to its offset but fail because it is unable to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing topic test-topic,
partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing topic test-topic,
partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Get spark-executor-consumer
test-topic 2 nextOffset 4394204414 requested 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: Get spark-executor-consumer
test-topic 2 nextOffset 4394204414 requested 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: Initial fetch
for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: Seeking to
test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting block rdd_199_2
failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block rdd_199_2 could
not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in task 49.0
in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> 	at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> 	at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> 	at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> 	at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> 	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Polled [test-topic-2]
 8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Get spark-executor-consumer
test-topic 2 nextOffset 4394204415 requested 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Get spark-executor-consumer
test-topic 2 nextOffset 4394204416 requested 4394204416
> ... 
> {code}
> It looks like when WindowedDStream does the getOrCompute call its computing all the sets
of of offsets it needs and tries to farm out the work in parallel. So each available worker
task gets each set of offsets that need to be read.
> After realizing what was going on I tested four states:
> * spark.executor.cores 1 and spark.mesos.extra.cores 0
> ** No Exceptions
> * spark.executor.cores 1 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 0
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> Minimal set of code I was able to reproduce with:
> Streaming batch interval was set to 2 seconds. This increased the rate of exceptions
I saw.
> {code}
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
> val windowStream = inputStream.map(_.toString).window(Seconds(180), Seconds(30))
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message