beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stas Levin (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-979) ConcurrentModificationException exception after hours of running
Date Tue, 15 Nov 2016 12:07:58 GMT

    [ https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666975#comment-15666975
] 

Stas Levin commented on BEAM-979:
---------------------------------

I think it may have to do with {StateSpecFunctions#mapSourceFunction}}, where we have this:

{code:java}

// read microbatch.
final List<WindowedValue<T>> readValues = new ArrayList<>();

// ...

while (!finished) {
            readValues.add(WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(),
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
            finished = !reader.advance();
          }

// ...

return Iterators.unmodifiableIterator(readValues.iterator());

{code}

I think there may be a scenario where {{readValues}} may be changed and read concurrently,
which causes a {{ConcurrentModificationException}} to be thrown upon invoking the {{iterator#next}}
we see in the stacktrace. 

One of the workarounds I'm trying is to change {{final List<WindowedValue<T>>
readValues = new ArrayList<>();}} to {{final Collection<WindowedValue<T>>
readValues = new ConcurrentLinkedQueue<>();}}. Ever since making this change the exception
has not reoccured, but given its nature it's too soon to tell for sure.

Moreover, this change may prevent from a {{ConcurrentModificationException}} being thrown,
but we still need to consider if it being thrown is merely a symptom of some scenario being
improperly handled rather than a problem per-se.

> ConcurrentModificationException exception after hours of running
> ----------------------------------------------------------------
>
>                 Key: BEAM-979
>                 URL: https://issues.apache.org/jira/browse/BEAM-979
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Stas Levin
>            Assignee: Stas Levin
>
> {code}
> 	
> User class threw exception: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: Lost task 1.3 in stage
4483.0 (TID 44548, xxxx.com): java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message