spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "JESSE CHEN (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-13288) [1.6.0] Memory leak in Spark streaming
Date Tue, 23 Feb 2016 19:10:18 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159450#comment-15159450
] 

JESSE CHEN commented on SPARK-13288:
------------------------------------

I enabled "spark.memory.useLegacyMode", and out of memory still occurred.  So looks like auto
memory management had nothing to do with this streaming memory leak issue. See details below:

Spark properties used, including those specified through
 --conf and those from the properties file /TestAutomation/spark-1.6.0/conf/spark-defaults.conf:
  spark.yarn.queue -> default
  spark.history.kerberos.principal -> none
  spark.driver.memory -> 12g
  spark.yarn.max.executor.failures -> 3
  spark.yarn.historyServer.address -> bigaperf132.svl.ibm.com:18080
  spark.eventLog.enabled -> true
  spark.history.ui.port -> 18080
  spark.yarn.applicationMaster.waitTries -> 10
  spark.yarn.scheduler.heartbeat.interval-ms -> 5000
  spark.yarn.executor.memoryOverhead -> 384
  spark.yarn.submit.file.replication -> 3
  spark.driver.extraJavaOptions -> -Diop.version={{iop_full_version}}
  spark.yarn.containerLauncherMaxThreads -> 25
  spark.memory.useLegacyMode -> true
  spark.yarn.driver.memoryOverhead -> 384
  spark.history.kerberos.keytab -> none
  spark.yarn.am.extraJavaOptions -> -Diop.version={{iop_full_version}}
  spark.eventLog.dir -> hdfs://bigaperf132.svl.ibm.com:8020/iop/apps/4.1.0.0/spark/logs/history-server
  spark.yarn.preserve.staging.files -> false

22 miniutes into the test, got containers "killed by yarn for exceeding memory limit" errors:

16/02/22 17:15:21 ERROR cluster.YarnScheduler: Lost executor 1 on bigaperf132.svl.ibm.com:
Container killed by YARN for exceeding memory limits. 4.5 GB of 4.4 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
16/02/22 17:15:21 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1940.0 (TID 5619,
bigaperf132.svl.ibm.com): ExecutorLostFailure (executor 1 exited caused by one of the running
tasks) Reason: Container killed by YARN for exceeding memory limits. 4.5 GB of 4.4 GB physical
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
^M[Stage 1940:===>            (1 + 1) / 5][Stage 1948:=======>       (5 + 0) / 10]^M[Stage
1940:===>            (1 + 4) / 5][Stage 1948:=======>       (5 + 5) / 10]^M[Stage 1940:===>
           (1 + 4) / 5][Stage 1948:=========>     (6 + 4) / 10]^M[Stage 1940:===>  
         (1 + 4) / 5][Stage 1948:==========>    (7 + 3) / 10]^M[Stage 1940:======> 
       (2 + 3) / 5][Stage 1948:==========>    (7 + 3) / 10]16/02/22 17:15:24 WARN scheduler.TaskSetManager:
Lost task 9.0 in stage 1948.0 (TID 5633, bigaperf134.svl.ibm.com): FetchFailed(null, shuffleId=330,
mapId=-1, reduceId=9, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
330
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)


>From executor logs

16/02/22 18:17:07 INFO spark.MapOutputTrackerWorker: Got the output locations
16/02/22 18:17:07 INFO storage.ShuffleBlockFetcherIterator: Getting 5 non-empty blocks out
of 5 blocks
16/02/22 18:17:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1
outstanding blocks 
java.io.IOException: Failed to connect to bigaperf133.svl.ibm.com/9.30.104.155:53729
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:316)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:263)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)





> [1.6.0] Memory leak in Spark streaming
> --------------------------------------
>
>                 Key: SPARK-13288
>                 URL: https://issues.apache.org/jira/browse/SPARK-13288
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.6.0
>         Environment: Bare metal cluster
> RHEL 6.6
>            Reporter: JESSE CHEN
>              Labels: streaming
>
> Streaming in 1.6 seems to have a memory leak.
> Running the same streaming app in Spark 1.5.1 and 1.6, all things equal, 1.6 showed a
gradual increasing processing time. 
> The app is simple: 1 Kafka receiver of tweet stream and 20 executors processing the tweets
in 5-second batches. 
> Spark 1.5.0 handles this smoothly and did not show increasing processing time in the
40-minute test; but 1.6 showed increasing time about 8 minutes into the test. Please see chart
here:
> https://ibm.box.com/s/7q4ulik70iwtvyfhoj1dcl4nc469b116
> I captured heap dumps in two version and did a comparison. I noticed the Byte is using
50X more space in 1.5.1.
> Here are some top classes in heap histogram and references. 
> Heap Histogram						
> 						
> All Classes (excluding platform)						
> 	1.6.0 Streaming			1.5.1 Streaming		
> Class	Instance Count	Total Size		Class 	Instance Count 	Total Size
> class [B	8453	3,227,649,599		class [B 	5095	62,938,466
> class [C	44682	4,255,502		class [C 	130482	12,844,182
> class java.lang.reflect.Method	9059	1,177,670		class java.lang.String 	130171	1,562,052
> 						
> 						
> References by Type				References by Type		
> 					
> class [B [0x640039e38]				class [B [0x6c020bb08]		
> 						
> Referrers by Type				Referrers by Type		
> 						
> Class	Count			Class	Count	
> java.nio.HeapByteBuffer	3239			sun.security.util.DerInputBuffer	1233	
> sun.security.util.DerInputBuffer	1233			sun.security.util.ObjectIdentifier	620	
> sun.security.util.ObjectIdentifier	620			[[B	397	
> [Ljava.lang.Object;	408			java.lang.reflect.Method	326	
> ----
> The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
> The Java.nio.HeapByteBuffer referencing class did not show up in top in 1.5.1. 
> I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get them here
> https://ibm.box.com/sparkstreaming-jstack160
> https://ibm.box.com/sparkstreaming-jstack151
> Jesse 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message