spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sela, Amit" <ANS...@paypal.com.INVALID>
Subject Re: NullPointerException when adding to accumulator
Date Thu, 15 Oct 2015 06:03:04 GMT
Accumulators seem to work in in foreachRDD but not in DStream operations such as map.
In addition, if I try to access the accumulator within foreachRDD{foreachPartition{foreach{//NPE//}}}
I got NPE as well, unless I define a new accumulator foreachRDD like this: foreachRDD{//new
accumulator// foreachPartition{foreach{//OK//}}}


From: Adrian Tanase
Date: Wednesday, October 14, 2015 at 8:19 PM
To: "Sela, Amit"
Cc: Ted Yu, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: NullPointerException when adding to accumulator

Can you try to use foreachRdd instead of print and access the accumulator value in there?

Could it be that you are accessing the accumulator before its initialized or after it was
cleaned up?

Sent from my iPhone

On 14 Oct 2015, at 17:46, Sela, Amit <ANSELA@paypal.com.INVALID<mailto:ANSELA@paypal.com.invalid>>
wrote:

This is not the entire code (obtaining the Kafka stream is in EventReader), but the relevant
part is:

val kafkaStream = EventsReader.getRawStreamByEvent(ssc, classOf[LoginEvent]).cache()
val messageCount = ssc.sparkContext.accumulator(0, "message_count")
kafkaStream.map(stream => {
  messageCount += 1 // line number: (31)
  stream
}).print(1)
ssc.awaitTerminationOrTimeout(60000)
println(s"${messageCount.name.get}=${messageCount.value}")


This is the stack trace:

river stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1298)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:722)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:721)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at com.paypal.risk.platform.nextgen.LoginCountMessages$.increment(LoginCountMessages.scala:31)
        at com.paypal.risk.platform.nextgen.LoginCountMessages$$anonfun$1.apply(LoginCountMessages.scala:35)
        at com.paypal.risk.platform.nextgen.LoginCountMessages$$anonfun$1.apply(LoginCountMessages.scala:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)





From: Ted Yu
Date: Wednesday, October 14, 2015 at 5:38 PM
To: "Sela, Amit"
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: NullPointerException when adding to accumulator

Can you show your code snippet with the complete stack trace ?

Thanks

On Wed, Oct 14, 2015 at 7:30 AM, Sela, Amit <ANSELA@paypal.com.invalid<mailto:ANSELA@paypal.com.invalid>>
wrote:
I'm running a simple streaming application that reads from Kafka, maps the events and prints
them and I'm trying to use accumulators to count the number of mapped records.

While this works in standalone(IDE), when submitting to YARN I get NullPointerException on
accumulator.add(1) or accumulator += 1

Anyone using accumulators in .map() with Spark 1.5 and YARN ?

Thanks,
Amit




Mime
View raw message