spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
Subject Re: [Streaming] ConcurrentModificationExceptions when Windowing
Date Thu, 12 Jan 2017 01:43:25 GMT
Thanks for reporting this. Finally I understood the root cause. Could you
file a JIRA on https://issues.apache.org/jira/browse/SPARK please?

On Wed, Jan 11, 2017 at 5:20 PM, Kalvin Chau <kalvinnchau@gmail.com> wrote:

> Here is the minimal code example where I was able to replicate:
> Batch interval is set to 2 to get the exceptions to happen more often.
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
>
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
>
> val windowStream = inputStream.map(_.toString).window(Seconds(180), Seconds(30))
>
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
>
>
> On Wed, Jan 11, 2017 at 4:04 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Could you post your codes, please?
>>
>> On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <kalvinnchau@gmail.com>
>> wrote:
>>
>> "spark.speculation" is not set, so it would be whatever the default is.
>>
>>
>> On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Or do you enable "spark.speculation"? If not, Spark Streaming should not
>> launch two tasks using the same TopicPartition.
>>
>> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <kalvinnchau@gmail.com>
>> wrote:
>>
>> I have not modified that configuration setting, and that doesn't seem to
>> be documented anywhere.
>>
>> Does the Kafka 0.10 require the number of cores on an executor be set to
>> 1? I didn't see that documented anywhere either.
>>
>> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka
>> 0.10 connector requires it must be 1.
>>
>> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <kalvinnchau@gmail.com>
>> wrote:
>>
>> I'm not re-using any InputDStreams actually, this is one InputDStream
>> that has a window applied to it.
>>  Then when Spark creates and assigns tasks to read from the Topic, one
>> executor gets assigned two tasks to read from the same TopicPartition, and
>> uses the same CachedKafkaConsumer to read from the TopicPartition causing
>> the ConcurrentModificationException in one of the worker threads.
>>
>> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> I think you may reuse the kafka DStream (the DStream returned by
>> createDirectStream). If you need to read from the same Kafka source, you
>> need to create another DStream.
>>
>> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <kalvinnchau@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
>> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
>> been working through debugging this issue and after looking through some of
>> the spark source code I think this is a bug.
>>
>> Our set up is:
>> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
>> Spark-Streaming-Kafka-010
>> spark.executor.cores 1
>> spark.mesos.extra.cores 1
>>
>> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>>
>> We would see the exception when in one executor there are two task worker
>> threads assigned the same Topic+Partition, but a different set of offsets.
>>
>> They would both get the same CachedKafkaConsumer, and whichever task
>> thread went first would seek and poll for all the records, and at the same
>> time the second thread would try to seek to its offset but fail because it
>> is unable to acquire the lock.
>>
>> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
>> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>>
>> Time1 E0 Task0 - Seeks and starts to poll
>> Time1 E0 Task1 - Attempts to seek, but fails
>>
>> Here are some relevant logs:
>> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394204414
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
>> Initial fetch for spark-executor-consumer test-topic 2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Seeking to test-topic-2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager:
>> Putting block rdd_199_2 failed due to an exception
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
>> rdd_199_2 could not be removed as it was not found on disk or in memory
>> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
>> in task 49.0 in stage 45.0 (TID 3201)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> acquire(KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
>> KafkaConsumer.java:1132)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> seek(CachedKafkaConsumer.scala:95)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> get(CachedKafkaConsumer.scala:69)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:227)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:193)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
>> MemoryStore.scala:360)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:951)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>> at org.apache.spark.storage.BlockManager.doPutIterator(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
>> BlockManager.scala:670)
>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:79)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Polled [test-topic-2]  8237
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204415 requested 4394204415
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204416 requested 4394204416
>> ...
>>
>> It looks like when WindowedDStream does the getOrCompute call its
>> computing all the sets of of offsets it needs and tries to farm out the
>> work in parallel. So each available worker task gets each set of offsets
>> that need to be read.
>>
>> After realizing what was going on I tested four states:
>>
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>>       - No Exceptions
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>
>>
>> I'm not sure what the best solution to this is if we want to be able to
>> have N tasks threads read from the same TopicPartition to increase
>> parallelization. You could possibly allow N CachedKafkaConsumers for the
>> same TopicPartition.
>>
>> Any thoughts on this?
>>
>> Thanks,
>> Kalvin
>>
>>
>>
>>
>>
>>

Mime
View raw message