spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bartosz Mścichowski (JIRA) <j...@apache.org>
Subject [jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
Date Fri, 20 Oct 2017 11:55:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212522#comment-16212522
] 

Bartosz Mścichowski commented on SPARK-21492:
---------------------------------------------

Here's a script that exposes memory leak during SortMergeJoin in Spark 2.2.0, maybe it will
be helpful.
Memory leak happens when the following code is executed in spark-shell (a local one). {{--conf
spark.sql.autoBroadcastJoinThreshold=-1}} may be needed to ensure proper join type.

{noformat}
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val table1Key = "t1_key"
val table1Value = "t1_value"

val table2Key = "t2_key"
val table2Value = "t2_value"

val table1Schema = StructType(List(
    StructField(table1Key, IntegerType),
    StructField(table1Value, DoubleType)
));

val table2Schema = StructType(List(
    StructField(table2Key, IntegerType),
    StructField(table2Value, DoubleType)
));

val table1 = spark.sqlContext.createDataFrame(
    rowRDD = spark.sparkContext.parallelize(Seq(
        Row(1, 2.0)
    )),
    schema = table1Schema
);

val table2 = spark.sqlContext.createDataFrame(
    rowRDD = spark.sparkContext.parallelize(Seq(
        Row(1, 4.0)
    )),
    schema = table2Schema
);


val t1 = table1.repartition(col(table1Key)).groupBy(table1Key).avg()
val t2 = table2.repartition(col(table2Key)).groupBy(table2Key).avg()

val joinedDF = t1 join t2 where t1(table1Key) === t2(table2Key)

joinedDF.explain()
// == Physical Plan ==
// *SortMergeJoin [t1_key#2], [t2_key#9], Inner
// :- *Sort [t1_key#2 ASC NULLS FIRST], false, 0
// :  +- *HashAggregate(keys=[t1_key#2], functions=[avg(cast(t1_key#2 as bigint)), avg(t1_value#3)])
// :     +- *HashAggregate(keys=[t1_key#2], functions=[partial_avg(cast(t1_key#2 as bigint)),
partial_avg(t1_value#3)])
// :        +- Exchange hashpartitioning(t1_key#2, 200)
// :           +- *Filter isnotnull(t1_key#2)
// :              +- Scan ExistingRDD[t1_key#2,t1_value#3]
// +- *Sort [t2_key#9 ASC NULLS FIRST], false, 0
//    +- *HashAggregate(keys=[t2_key#9], functions=[avg(cast(t2_key#9 as bigint)), avg(t2_value#10)])
//       +- *HashAggregate(keys=[t2_key#9], functions=[partial_avg(cast(t2_key#9 as bigint)),
partial_avg(t2_value#10)])
//          +- Exchange hashpartitioning(t2_key#9, 200)
//             +- *Filter isnotnull(t2_key#9)
//                +- Scan ExistingRDD[t2_key#9,t2_value#10]

joinedDF.show()
// The 'show' action yields a lot of:
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size = 4194304
bytes, TID = 8
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size = 4194304
bytes, TID = 9
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size = 4194304
bytes, TID = 11
{noformat}


> Memory leak in SortMergeJoin
> ----------------------------
>
>                 Key: SPARK-21492
>                 URL: https://issues.apache.org/jira/browse/SPARK-21492
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Zhan Zhang
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak caused
by the Sort. The memory is not released until the task end, and cannot be used by other operators
causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message