My workers are going OOM over time. I am running a streaming job in spark 1.4.0. 
Here is the heap dump of workers. 

16,802 instances of "org.apache.spark.deploy.worker.ExecutorRunner", loaded by "sun.misc.Launcher$AppClassLoader @ 0xdff94088" occupy 488,249,688 (95.80%) bytes. These instances are referenced from one instance of "java.lang.Object[]", loaded by "<system class loader>" 

Keywords 
org.apache.spark.deploy.worker.ExecutorRunner 
java.lang.Object[] 
sun.misc.Launcher$AppClassLoader @ 0xdff94088 

is this because of this bug:
http://apache-spark-developers-list.1001551.n3.nabble.com/Worker-memory-leaks-td13341.html
https://issues.apache.org/jira/browse/SPARK-9202

Also,
I am getting below error continuously if one of the worker/executor dies on any node in my spark cluster. 
If I start the worker also, error doesn't go. I have to force_kill my streaming job and restart to fix the issue. Is it some bug? 
I am using Spark 1.4.0. 

MY_IP in logs is IP of worker node which failed. 

15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194218 - Ask timed out on [Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]] after [120000 ms]} 
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]] after [120000 ms] 
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) 
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) 
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) 
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) 
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) 
        at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) 
        at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) 
        at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) 
        at java.lang.Thread.run(Thread.java:745) 
15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194217 - Ask timed out on [Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]] after [120000 ms]} 
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]] after [120000 ms] 
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) 
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) 
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) 
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) 
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) 
        at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) 
        at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) 
        at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) 
        at java.lang.Thread.run(Thread.java:745) 
15/09/03 11:29:11 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 16723 
15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194216 - Ask timed out on [Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]] after [120000 ms]} 

It is easily reproducible if I manually stop a worker on one of my node. 
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 329 
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 333 
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 334 

It doesn't go even if I start the worker again. 

Follow up question: If my streaming job has consumed some events from Kafka topic and are pending to be scheduled because of delay in processing... Will my force killing the streaming job lose that data which is not yet scheduled? 



--
VARUN SHARMA
Flipkart
Bangalore