spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steve Loughran (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-21999) ConcurrentModificationException - Spark Streaming
Date Fri, 06 Oct 2017 09:15:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194332#comment-16194332
] 

Steve Loughran commented on SPARK-21999:
----------------------------------------

Telling a project "their design is wrong" and expecting a co-operative response isn't going
to work, and if something is a core part of the architecture, it's not going to change. That's
why he's closing it. It'd be like going to linux-kernel dev and demanding that they switched
to a microkernel architecture, or emacs-dev and pointing out that their key encodings make
no sense on modern keyboards. All WONTFIX complaints where you aren't going get any satisfaction
in raising -so why bother.

bq. About your other points, I already modified my code to get around this issue. 

good to hear. In my many years as a software developer, I've come to realise that software
development is about working round the implementation and architectural decisions which my
predecessors made, and which, in modern times, don't seem relevant. All while trying not to
do the same things yourself. It's a losing battle, but being able to work around problems
is the fundamental skill that seems to hold

bq. 1. In the first place, why does Spark serialize the application objects asynchronously
while the streaming application is running continuously from batch to batch ?

Don't know. to find out, I'd find the  lines where it takes place, select it in my IDE, hit
"show history for selection" & work back from the pull requests 

bq. 2. If Spark needs to do this type of serialization at all, why does it not do at the end
of the batch synchronously ?

It's how it checkpoints the state of a streaming application. That's a fundamental need. 

bq. But Sean did not provide the answers and instead just kept closing that ticket. If he
does not know the answers or information for tickets, he should let someone else who has such
information answers them.

The source is there. And along with the source comes the SCM history, which provides the rationale
for most decisions.

Now, to close this, and to stop Sean taking all the blame, I'll be closing this as a WONTFIX.
Please, only re-open if you have something to contribute, in particular, as I mentioned, documentation.
Grab the latest source, improve the streaming docs, follow the spark contribution process
& submit a github pull request, see how it goes. 




> ConcurrentModificationException - Spark Streaming
> -------------------------------------------------
>
>                 Key: SPARK-21999
>                 URL: https://issues.apache.org/jira/browse/SPARK-21999
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Michael N
>            Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8.  I am getting ConcurrentModificationException
intermittently.  When it occurs, Spark does not honor the specified value of spark.task.maxFailures.
So Spark aborts the current batch  and fetch the next batch, so it results in lost data. Its
exception stack is listed below. 
> This instance of ConcurrentModificationException is similar to the issue at https://issues.apache.org/jira/browse/SPARK-17463,
which was about Serialization of accumulators in heartbeats.  However, my Spark stream app
does not use accumulators. 
> The stack trace listed below occurred on the Spark master in Spark streaming driver at
the time of data loss.   
> From the line of code in the first stack trace, can you tell which object Spark was trying
to serialize ?  What is the root cause for this issue  ?  
> Because this issue results in lost data as described above, could you have this issue
fixed ASAP ?
> Thanks.
> Michael N.,
> ----------------
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> 	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.map(List.scala:285)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
> 	at java.util.ArrayList.writeObject(ArrayList.java:766)
> 	at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> 	... 60 more
> 2017-09-12 07:02:10.029 ERROR 
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> 	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> 	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.map(List.scala:285)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> 	at scala.Option.orElse(Option.scala:289)
> 	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> 	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> 	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> 	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> 	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> 	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
> 	at java.util.ArrayList.writeObject(ArrayList.java:766)
> 	at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> 	... 60 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message