spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <>
Subject [jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
Date Fri, 02 Sep 2016 19:50:20 GMT


Sean Owen commented on SPARK-17381:

The only thing i can think of that accumulates row-like data are the ColumnStats of an InMemoryRelation.
But it's just accumulating things like max/min and count. You could see a row with a string
in there but that much is probably normal.

What happens if you set spark.sql.ui.retainedExecutions to something low like 10? As far as
I can see it's normal to retain this data about executions, but, if you have lots of stages
with lots of tasks then this could become huge fast.

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -------------------------------------------------------------
>                 Key: SPARK-17381
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version	1.8.0_101 (Oracle Corporation)
> Scala Version	version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. OS: Ubuntu
>            Reporter: Joao Duarte
> I am running a Spark Streaming application from a Kinesis stream. After some hours running
it gets out of memory. After a driver heap dump I found two problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was
a problem before: 
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to
run the code below:
> {code}
>     val dstream = ssc.union(kinesisStreams)
>     dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>       //load data
>       val toyDF = =>
>         (1, "data","more data "
>         ))
>         .toDF("Num", "Data", "MoreData" )
>       toyDF.agg(sum("Num")).first().get(0)
>     }
>     )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being referenced by
objects that were being referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte]
were basically text that were loaded in the executors, so they should never be in the driver
at all!
> Still could not replicate the 2nd problem with a simple code (the original was complex
with data coming from S3, DynamoDB and other databases). However, when I debug the application
I can see that in Executor.scala, during reportHeartBeat(),  the data that should not be sent
to the driver is being added to "accumUpdates" which, as I understand, will be sent to the
driver for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)"
 contains a GenericInternalRow with a lot of data that should not go to the driver. The path
would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar
(if not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I
would have less of this undesirable data in the driver and I could run my streaming app for
a long period of time, but I think there will always be some performance lost.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message