This is not the entire code (obtaining the Kafka stream is in EventReader), but the relevant
part is:
val kafkaStream = EventsReader.getRawStreamByEvent(ssc, classOf[LoginEvent]).cache()
val messageCount = ssc.sparkContext.accumulator(0, "message_count")
kafkaStream.map(stream => {
messageCount += 1 // line number: (31)
stream
}).print(1)
ssc.awaitTerminationOrTimeout(60000)
println(s"${messageCount.name.get}=${messageCount.value}")
This is the stack trace:
river stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1298)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:722)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:721)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at com.paypal.risk.platform.nextgen.LoginCountMessages$.increment(LoginCountMessages.scala:31)
at com.paypal.risk.platform.nextgen.LoginCountMessages$$anonfun$1.apply(LoginCountMessages.scala:35)
at com.paypal.risk.platform.nextgen.LoginCountMessages$$anonfun$1.apply(LoginCountMessages.scala:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
From: Ted Yu
Date: Wednesday, October 14, 2015 at 5:38 PM
To: "Sela, Amit"
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: NullPointerException when adding to accumulator
Can you show your code snippet with the complete stack trace ?
Thanks
On Wed, Oct 14, 2015 at 7:30 AM, Sela, Amit <ANSELA@paypal.com.invalid<mailto:ANSELA@paypal.com.invalid>>
wrote:
I'm running a simple streaming application that reads from Kafka, maps the events and prints
them and I'm trying to use accumulators to count the number of mapped records.
While this works in standalone(IDE), when submitting to YARN I get NullPointerException on
accumulator.add(1) or accumulator += 1
Anyone using accumulators in .map() with Spark 1.5 and YARN ?
Thanks,
Amit
|