spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Albert <m_albert...@yahoo.com.INVALID>
Subject Re: Wired Problem: Task not serializable[Spark Streaming]
Date Tue, 09 Jun 2015 02:23:37 GMT
Note that in scala, "return" is a non-local return: https://tpolecat.github.io/2014/05/09/return.htmlSo
that "return" is *NOT* returning from the anonymous function, but attempting to return from
the enclosing method, i.e., "main".Which is running on the driver, not on the workers.So on
the workers, there is no where to which the "return" can jump.Hence it is not serializable.
Good luck.-Mike

      From: "bit1129@163.com" <bit1129@163.com>
 To: user <user@spark.apache.org> 
 Sent: Monday, June 8, 2015 10:01 PM
 Subject: Re: Wired Problem: Task not serializable[Spark Streaming]
   
#yiv1823860044 body {line-height:1.5;}#yiv1823860044 blockquote {margin-top:0px;margin-bottom:0px;margin-left:0.5em;}#yiv1823860044
div.yiv1823860044foxdiv20150609100051035499 {}#yiv1823860044 body {font-size:10.5pt;color:rgb(0,
0, 0);line-height:1.5;}Could someone help explain what happens that leads to the Task not
serializable issue?Thanks.
bit1129@163.com
 From: bit1129@163.comDate: 2015-06-08 19:08To: userSubject: Wired Problem: Task not
serializable[Spark Streaming]Hi, With the following simple code, I got an exception that
complains Task not serializable. The root cause is I use return in map foreach loop

Why "return in map foreach loop" cause the Task not serializable problem, can someone please
this to me?


import org.apache.spark.SparkConf
import org.apache.spark.streaming._

import scala.collection.mutable

object NetCatStreamingWordCount3 {
 def main(args: Array[String]) {
 val conf = new SparkConf().setAppName("NetCatWordCount")
 conf.setMaster("local[3]")
 val ssc = new StreamingContext(conf, Seconds(5))
 val lines = ssc.socketTextStream("localhost", 9999)
 lines.foreachRDD(rdd => {
 rdd.foreachPartition(partitionIterable=> {
 val map = mutable.Map[String, String]()
 while(partitionIterable.hasNext) {
 val v = partitionIterable.next()
 map += v ->v
 }

 map.foreach(entry => {
 if (entry._1.equals("abc")) {
 return;  //This is the root cause that cause the  Task not serializable.
 }
 })

 })
 })
 ssc.start()
 ssc.awaitTermination()
 }
}bit1129@163.com


  
Mime
View raw message