spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "The Facts (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22163) Design Issue of Spark Streaming that Causes Random Run-time Exception
Date Wed, 11 Oct 2017 06:12:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199848#comment-16199848
] 

The Facts commented on SPARK-22163:
-----------------------------------

Sean's latest claim is another evidence of his pattern of own disinformation and abuse.  
The JIRA account with Apache has no value when Sean keeps closing tickets without understanding
them or worse yet make blatant false claims. So I disregard their threat to disable my account
and provided them with the facts as listed below. The admins played politics and did not have
the integrity to view this ticket to see Sean's false claims. 

For instance, the text description of the ticket clearly says 

"My application does not spin up its own thread. All the threads are controlled by Spark.",


he still made a blatant opposite false claim of 

 "you imply this happens outside of Spark's threads, in an app thread you spawn."

There is a saying that people do not quit their job. Instead they quit their bosses. For open-source
projects without "bosses", the analogy is that people don't contribute or quit because of
the work.  People don't contribute or quit because of abusers who is not only more interested
in closing tickets without understanding them and abuse their role on the project to block
other people from opening the ticket.



> Design Issue of Spark Streaming that Causes Random Run-time Exception
> ---------------------------------------------------------------------
>
>                 Key: SPARK-22163
>                 URL: https://issues.apache.org/jira/browse/SPARK-22163
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark Streaming
> Kafka
> Linux
>            Reporter: The Facts
>
> The application objects can contain List and can be modified dynamically as well.   However,
Spark Streaming framework asynchronously serializes the application's objects as the application
runs.  Therefore, it causes random run-time exception on the List when Spark Streaming framework
happens to serializes the application's objects while the application modifies a List in its
own object.  
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark streaming framework
is that it should do this serialization asynchronously.  Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the issue completely.
 Or
> 2. Allow it to be configured per application whether to do this serialization synchronously
or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark to do this
type of serialization asynchronously, so the applications can work around them until the fix
is provided. 
> ===
> Vadim Semenov and Steve Loughran, per your inquiries in ticket https://issues.apache.org/jira/browse/SPARK-21999,
I am posting the reply here because this issue involves Spark's design and not necessarily
its code implementation.
> —
> My application does not spin up its own thread. All the threads are controlled by Spark.
> Batch interval = 5 seconds
> Batch #3
> 1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave threads are
done with this batch
> 2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
> 3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete
> 4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead and process
batch #4. Instead, they wait for thread #3 until it is done. => So there is already synchronization
among the threads within the same batch. Also, batch to batch is synchronous.
> 5. After Spark Thread #3 is done, the driver does other processing to finish the current
batch. In my case, it updates a list of objects.
> The above steps repeat for the next batch #4 and subsequent batches.
> Based on the exception stack trace, it looks like in step 5, Spark has another thread
#4 that serializes application objects asynchronously. So it causes random occurrences of
ConcurrentModificationException, because the list of objects is being changed by Spark own
thread #1 for the driver.
> So the issue is not that my application "is modifying a collection asynchronously w.r.t.
Spark" as Sean kept claiming. Instead, it is Spark's asynchronous operations among its own
different threads within the same batch that causes this issue.
> Since Spark controls all the threads and their synchronization, it is a Spark design's
issue for the lack of synchronization between threads #1 and #4, that triggers ConcurrentModificationException.
 That is the root cause of this issue.
> Further, even if the application does not modify its list of objects, in step 5 the driver
could be modifying multiple native objects say two integers. In thread #1 the driver could
have updated integer X and before it could update integer Y, when Spark's thread #4 asynchronous
serializes the application objects. So the persisted serialized data does not match with the
actual data. This resulted in a permutation of this issue with a false positive condition
where the serialized checkpoint data has partially correct data.
> One solution for both issues is to modify Spark's design and allow the serialization
of application objects by Spark's thread #4 to be configurable per application to be either
asynchronous or synchronous with Spark's thread #1. That way, it is up to individual applications
to decide based on the nature of their business requirements and needed throughput.
> ===
> The code is listed below. Due to the asynchronous nature of Spark's thread operations
and different hardware, the issue relating to this ticket occurs randomly. So you may need
to tweak the batch duration.
> package test;
> {code}
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.regex.Pattern;
> import scala.Tuple2;
> import kafka.serializer.StringDecoder;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.streaming.api.java.*;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import org.apache.spark.streaming.Durations;
> /**
>  * Consumes messages from one or more topics in Kafka and does wordcount.
>  * Usage: JavaDirectKafkaWordCount <brokers> <topics>
>  *   <brokers> is a list of one or more Kafka brokers
>  *   <topics> is a list of one or more kafka topics to consume from
>  *
>  * Example:
>  *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port
\
>  *      topic1,topic2
>  */
> //  ,    VoidFunction<Iterator<Tuple2<String, Integer>>>
> public final class JavaDirectKafkaWordCount_Extended implements VoidFunction<JavaPairRDD<String,
Integer>> {
>    
>   private static final Pattern SPACE = Pattern.compile(" ");
>  
>   private List<String> appStringList;
>   public static void main(String[] args) throws Exception {
>       try {
>         if (args.length < 2) {
>             System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n"
+
>                 "  <brokers> is a list of one or more Kafka brokers\n" +
>                 "  <topics> is a list of one or more kafka topics to consume from\n\n");
>             System.exit(1);           
>           }
>        
>         JavaDirectKafkaWordCount javaDirectKafkaWordCount = new JavaDirectKafkaWordCount
();
>         javaDirectKafkaWordCount.setupStreamApp(args);
>       } catch (Throwable exc) {
>           exc.printStackTrace();
>       }
>   }
>  
>   private void setupStreamApp (String[] args) throws InterruptedException {     
>     // StreamingExamples.setStreamingLogLevels();
>         String brokers = args[0];
>         String topics = args[1];
>         // create list of string with dummy values.
>         appStringList = new ArrayList<>();
>         for (int i = 0; i < 1000; ) {
>             appStringList.add("a-"+ i++);           
>         }
>    
>         SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
>        
>         // Create context with a 2 seconds batch interval
>         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
>         jssc.checkpoint("./test-checkpoint/");
>    
>         Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
>         Map<String, String> kafkaParams = new HashMap<>();
>         kafkaParams.put("metadata.broker.list", brokers);
>    
>         // Create direct kafka stream with brokers and topics
>         JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
>             jssc,
>             String.class,
>             String.class,
>             StringDecoder.class,
>             StringDecoder.class,
>             kafkaParams,
>             topicsSet
>         );
>    
>         // Get the lines, split them into words, count the words and print
>         JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {
>           /**
>              *
>              */
>             private static final long serialVersionUID = 3769940753726592424L;
>             @Override
>               public String call(Tuple2<String, String> tuple2) {
>                 return tuple2._2();
>               }
>         });
>        
>         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
>               @Override
>               public Iterator<String> call(String x) {
>                 return Arrays.asList(SPACE.split(x)).iterator();
>               }
>         });
>        
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>               new PairFunction<String, String, Integer>() {
>                 @Override
>                 public Tuple2<String, Integer> call(String s) {
>                   return new Tuple2<>(s, 1);
>                 }
>               }).reduceByKey(
>                 new Function2<Integer, Integer, Integer>() {
>                 @Override
>                 public Integer call(Integer i1, Integer i2) {
>                   return i1 + i2;
>                 }
>         });
>        
>         wordCounts.foreachRDD(this);
>                
>         // Start the computation
>         jssc.start();
>         jssc.awaitTermination();
>     }
>     @Override
>     public void call(JavaPairRDD<String, Integer> dataStream) throws Exception
{
>         System.out.println("start foreachRDD");
>        
>         dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String,
Integer>>> () {
>             // Assuming that there are two slave threads, this foreachPartition code
corresponds to Steps 2 and 3 for threads #2 and #3
>             // as described in the high-level sequence described of the textual description
above the code
>             @Override
>             public void call(Iterator<Tuple2<String, Integer>> tuples) throws
Exception {
>                 if (tuples == null || !tuples.hasNext()) {
>                     return;
>                 }
>                
>                 while (tuples.hasNext()) {
>                     // The step below is not related to the issue. It is used just to
simulate some operation in the slave threads for completeness
>                     System.out.println(tuples.next()._1);
>                 }                   
>             }               
>         });           
>          /* ===> 
>           *  the steps below corresponds to Step 5 of the high-level sequence and Spark's
thread #1 as described of the textual description above the code.
>           * 
>           *  These steps below are where ConcurrentModificationException occurs randomly
as explained in Step 5 of the textual description of this ticket.
>           *  For the purpose of this test, these steps update the list by simply rotating
the entries.
>          *
>          * Based on the stack trace, Spark has another thread, i,e, thread #4, that asynchronously
serializes the application objects during the
>          * the next three operations. So it would randomly encounters ConcurrentModificationException
 because Spark's thread #4 tries to serialize appStringList
>          * while Spark's thread #1 is modifying the same list.
>          */
>         String tmp = appStringList.get(0);
>         appStringList.remove(0);
>         appStringList.add(tmp);
>         System.out.println("end foreachRDD");
>     }   
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message