spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "msnreddy (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
Date Sun, 04 Mar 2018 16:23:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16385221#comment-16385221
] 

msnreddy commented on SPARK-5206:
---------------------------------

Broadcast variable cannot be used with MapwithState if we need to recover from checkpoint
directory in Spark streaming. It can only be used inside output operations in that case as
it requires Spark context to lazily initialize the broadcast


 class JavaWordBlacklist {
 
 private static volatile Broadcast<List<String>> instance = null;
 
 public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
 if (instance == null) {
 synchronized (JavaWordBlacklist.class) {
 if (instance == null) {
 List<String> wordBlacklist = Arrays.asList("a", "b", "c");
 instance = jsc.broadcast(wordBlacklist);
 }
 }
 }
 return instance;
 }
 }
 
 class JavaDroppedWordsCounter {
 
 private static volatile LongAccumulator instance = null;
 
 public static LongAccumulator getInstance(JavaSparkContext jsc) {
 if (instance == null) {
 synchronized (JavaDroppedWordsCounter.class) {
 if (instance == null) {
 instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
 }
 }
 }
 return instance;
 }
 }
 
 wordCounts.foreachRDD((rdd, time) -> {
 // Get or register the blacklist Broadcast
 Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
 // Get or register the droppedWordsCounter Accumulator
 LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
 // Use blacklist to drop words and use droppedWordsCounter to count them
 String counts = rdd.filter(wordCount -> {
 if (blacklist.value().contains(wordCount._1())) {
 droppedWordsCounter.add(wordCount._2());
 return false;
 } else {
 return true;
 }
 }).collect().toString();
 String output = "Counts at time " + time + " " + counts;
 }

 

> Accumulators are not re-registered during recovering from checkpoint
> --------------------------------------------------------------------
>
>                 Key: SPARK-5206
>                 URL: https://issues.apache.org/jira/browse/SPARK-5206
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 1.1.0
>            Reporter: vincent ye
>            Priority: Major
>
> I got exception as following while my streaming application restarts from crash from
checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR scheduler.DAGScheduler:
Failed to update accumulators for ShuffleMapTask(41, 4)
> java.util.NoSuchElementException: key not found: 1
> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
> 	at scala.collection.AbstractMap.default(Map.scala:58)
> 	at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is recovered
from checkpoint. It won't be executed in the driver. So when the driver process it at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
It can't find the Accumulator because it's not re-register during the recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message