spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apoorva Sareen <apoorva.sar...@gmail.com>
Subject Re: Spark off heap memory leak on Yarn with Kafka direct stream
Date Tue, 14 Jul 2015 01:10:51 GMT
It happens irrespective of whether there is traffic or no traffic on the kafka topic. Also,
there is no clue i could see in the heap space. The heap looks healthy and stable. Its something
off heap which is constantly growing. I also checked the JNI reference count from the dumps
which appear stable (its constantly getting GCed) and tried to limit the size of meatspace
and direct memory using following

--conf spark.driver.extraJavaOptions="-XX:MaxMetaspaceSize=128M -XX:MaxDirectMemorySize=128M"
\

but with no success.  Thanks for offering help

Regards,
Apoorva

> On 14-Jul-2015, at 12:43 am, Cody Koeninger <cody@koeninger.org> wrote:
> 
> Does the issue only happen when you have no traffic on the topic?
> 
> Have you profiled to see what's using heap space?
> 
> 
> On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen <apoorva.sareen@gmail.com <mailto:apoorva.sareen@gmail.com>>
wrote:
> Hi,
> 
> I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45
and also Kafka direct stream. I am also using spark with scala 2.11 support.
> 
> The issue I am seeing is that both driver and executor containers are gradually increasing
the physical memory usage till a point where yarn container kill it. I have configured upto
192M Heap and 384 off heap space in my driver but it eventually runs out of it
> 
> The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered
ever in any such runs
> 
> Infact I am not generating any traffic on the kafka queues still this happens. Here is
the code I am using
> 
> object SimpleSparkStreaming extends App {
> 
> val conf = new SparkConf()
> val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
> ssc.checkpoint("checkpoint")
> val topics = Set(conf.get("spark.kafka.topic.name <http://spark.kafka.topic.name/>"));

>     val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
>             val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,
kafkaParams, topics)
>             kafkaStream.foreachRDD(rdd => {
>                 rdd.foreach(x => {
>                     println(x._2)
>                 })
> 
>             })
>     kafkaStream.print()
>             ssc.start() 
> 
>             ssc.awaitTermination()
> 
> }
> I am running this on CentOS 7. The command used for spark submit is following
> 
> ./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
> --conf spark.yarn.executor.memoryOverhead=256 \
> --conf spark.yarn.driver.memoryOverhead=384 \
> --conf spark.kafka.topic.name <http://spark.kafka.topic.name/>=test \
> --conf spark.kafka.broker.list=172.31.45.218:9092 <http://172.31.45.218:9092/>
\
> --conf spark.batch.window.size=1 \
> --conf spark.app.name <http://spark.app.name/>="Simple Spark Kafka application"
\
> --master yarn-cluster \
> --num-executors 1 \
> --driver-memory 192m \
> --executor-memory 128m \
> --executor-cores 1 \
> /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 
> Any help is greatly appreciated
> 
> Regards,
> 
> Apoorva
> 
> 


Mime
View raw message