spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrei Filip (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-3876) Doing a RDD map/reduce within a DStream map fails with a high enough input rate
Date Mon, 13 Oct 2014 06:57:33 GMT

    [ https://issues.apache.org/jira/browse/SPARK-3876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169019#comment-14169019
] 

Andrei Filip commented on SPARK-3876:
-------------------------------------

In a nutshell, the use case aims to parallelize many operations performed on the same input,
and aggregate the outputs. This suggestion was given to me in this discussion: http://chat.stackoverflow.com/rooms/61251/discussion-between-smola-and-andrei
(towards the end)

To be honest, the more fundamental question is whether spark streaming is actually appropriate
for this sort of use case.

> Doing a RDD map/reduce within a DStream map fails with a high enough input rate
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-3876
>                 URL: https://issues.apache.org/jira/browse/SPARK-3876
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.2
>            Reporter: Andrei Filip
>
> Having a custom receiver than generates random strings at custom rates: JavaRandomSentenceReceiver
> A class that does work on a received string:
> class LengthGetter implements Serializable{
> 	public int getStrLength(String s){
> 		return s.length();
> 	}
> }
> The following code:
> List<LengthGetter> objList = Arrays.asList(new LengthGetter(), new LengthGetter(),
new LengthGetter());
> 		
> 		final JavaRDD<LengthGetter> objRdd = sc.parallelize(objList);
> 		
> 		
> 		JavaInputDStream<String> sentences = jssc.receiverStream(new JavaRandomSentenceReceiver(frequency));
> 		
> 		sentences.map(new Function<String, Integer>() {
> 			@Override
> 			public Integer call(final String input) throws Exception {
> 				Integer res = objRdd.map(new Function<LengthGetter, Integer>() {
> 					@Override
> 					public Integer call(LengthGetter lg) throws Exception {
> 						return lg.getStrLength(input);
> 					}
> 				}).reduce(new Function2<Integer, Integer, Integer>() {
> 					
> 					@Override
> 					public Integer call(Integer left, Integer right) throws Exception {
> 						return left + right;
> 					}
> 				});
> 				
> 				
> 				return res;
> 			}			
> 		}).print();
> fails for high enough frequencies with the following stack trace:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3.0:0 failed 1 times, most recent failure: Exception failure in TID 3 on host
localhost: java.lang.NullPointerException
>         org.apache.spark.rdd.RDD.map(RDD.scala:270)
>         org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:72)
>         org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:29)
> Other information that might be useful is that my current batch duration is set to 1sec
and the frequencies for JavaRandomSentenceReceiver at which the application fails are as low
as 2Hz (1Hz for example works)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message