spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "SaintBacchus (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container
Date Sun, 01 Mar 2015 05:43:04 GMT

     [ https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

SaintBacchus updated SPARK-6056:
--------------------------------
    Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not ,spark can not limit
the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty had allocated
a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be allocated.
But once the allocated memory size reach the capacity of the overhead momery set in yarn,
this executor will be killed.
I wrote a simple code to test it:
{code:title=test.scala|borderStyle=solid}
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
        val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
        val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
        val len = resultIt.map(_.length).sum
        println(s"[${Thread.currentThread.getId}] get block length = $len")
}

def test_driver(count:Int, parallel:Int)(f: => Unit) = {
    val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
    val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
    val parseq = (1 to count).par
    parseq.tasksupport = taskSupport
    parseq.foreach(x=>f)

    tpool.shutdown
    tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
{code}
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use such comman to catch executor on slave node
{code}
pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p $pid|grep $pid
{code}
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not ,spark can not limit
the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty had allocated
a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be allocated.
But once the allocated memory size reach the capacity of the overhead momery set in yarn,
this executor will be killed.
I wrote a simple code to test it:
{code:title=test.scala|borderStyle=solid}
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
        val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
        val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
        val len = resultIt.map(_.length).sum
        println(s"[${Thread.currentThread.getId}] get block length = $len")
}

def test_driver(count:Int, parallel:Int)(f: => Unit) = {
    val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
    val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
    val parseq = (1 to count).par
    parseq.tasksupport = taskSupport
    parseq.foreach(x=>f)

    tpool.shutdown
    tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
{code}
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p $pid|grep
$pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the limit set by spark.yarn.executor.memoryOverhead


> Unlimit offHeap memory use cause RM killing the container
> ---------------------------------------------------------
>
>                 Key: SPARK-6056
>                 URL: https://issues.apache.org/jira/browse/SPARK-6056
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.2.1
>            Reporter: SaintBacchus
>
> No matter set the `preferDirectBufs` or limit the number of thread or not ,spark can
not limit the use of offheap memory.
> At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty had allocated
a offheap memory buffer with the same size in heap.
> So how many buffer you want to transfor, the same size offheap memory will be allocated.
> But once the allocated memory size reach the capacity of the overhead momery set in yarn,
this executor will be killed.
> I wrote a simple code to test it:
> {code:title=test.scala|borderStyle=solid}
> import org.apache.spark.storage._
> import org.apache.spark._
> val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new Array[Byte](10*1024*1024)).persist
> bufferRdd.count
> val part =  bufferRdd.partitions(0)
> val sparkEnv = SparkEnv.get
> val blockMgr = sparkEnv.blockManager
> def test = {
>         val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
>         val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
>         val len = resultIt.map(_.length).sum
>         println(s"[${Thread.currentThread.getId}] get block length = $len")
> }
> def test_driver(count:Int, parallel:Int)(f: => Unit) = {
>     val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
>     val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
>     val parseq = (1 to count).par
>     parseq.tasksupport = taskSupport
>     parseq.foreach(x=>f)
>     tpool.shutdown
>     tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
> }
> {code}
> progress:
> 1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
> 2. :load test.scala in spark-shell
> 3. use such comman to catch executor on slave node
> {code}
> pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p $pid|grep $pid
> {code}
> 4. test_driver(20,100)(test) in spark-shell
> 5. watch the output of the command on slave node
> If use multi-thread to get len, the physical memery will soon   exceed the limit set
by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message