spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-21999) ConcurrentModificationException - Spark Streaming
Date Thu, 05 Oct 2017 01:57:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192364#comment-16192364
] 

Sean Owen commented on SPARK-21999:
-----------------------------------

[~michaeln_apache] I'll have to see if we can block you from accessing the Spark JIRA if you
keep reopening the issue. Refer to your comment at https://issues.apache.org/jira/browse/SPARK-21999?focusedCommentId=16185130&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16185130

Spark Streaming is inherently asynchronous with respect to the driver process. Batch processing
is also inherently asynchronous in that many things are happening at once. You're asking as
if this is unexpected, like your comment about map vs flatMap, which suggests there may be
some fundamental misunderstandings about Spark.

There is no reproducible issue here that demonstrates behavior that doesn't make sense or
is inconsistent with an API, which is also a key component of a bug report.

Here's another productive way forward: propose a change that you think resolves something?

> ConcurrentModificationException - Spark Streaming
> -------------------------------------------------
>
>                 Key: SPARK-21999
>                 URL: https://issues.apache.org/jira/browse/SPARK-21999
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Michael N
>            Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting ConcurrentModificationException
intermittently.  When it occurs, Spark does not honor the specified value of spark.task.maxFailures.
So Spark aborts the current batch  and fetch the next batch, so it results in lost data. Its
exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at https://issues.apache.org/jira/browse/SPARK-17463,
which was about Serialization of accumulators in heartbeats.  However, my Spark stream app
does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming driver at
the time of data loss.   
> From the line of code in the first stack trace, can you tell which object Spark was trying
to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have this issue
fixed ASAP ?
> Thanks.
> Michael N.,
> ----------------
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> 	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.map(List.scala:285)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
> 	at java.util.ArrayList.writeObject(ArrayList.java:766)
> 	at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> 	... 60 more
> 2017-09-12 07:02:10.029 ERROR 
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> 	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.map(List.scala:285)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
> 	at java.util.ArrayList.writeObject(ArrayList.java:766)
> 	at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> 	... 60 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message