spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jerry Lam (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert
Date Mon, 26 Oct 2015 00:59:27 GMT

    [ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14973529#comment-14973529
] 

Jerry Lam edited comment on SPARK-8890 at 10/26/15 12:58 AM:
-------------------------------------------------------------

Hi guys, sorry by injecting comments into the closed jira. I just want to point out that I'm
using spark 1.5.1, I got OOM in the driver side after all partitions are written out (I have
over 1 million partitions). The job was marked SUCCESS in the output folder but the driver
took significant CPU and memory. After several hours, the driver dies with OOM. I already
configure the driver to use 6GB. The jstack of the process is as follows:
{code}
Thread 528: (state = BLOCKED)
 - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
 - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 (Compiled frame)
 - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, line=114 (Compiled
frame)
 - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 (Compiled frame)
 - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled frame)
 - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
@bci=4, line=447 (Compiled frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
@bci=5, line=447 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=9, line=244
(Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=2, line=244
(Compiled frame)
 - scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, line=108 (Compiled
frame)
 - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, scala.Function1,
scala.collection.generic.CanBuildFrom) @bci=17, line=244 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, scala.collection.generic.CanBuildFrom)
@bci=3, line=108 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
@bci=279, line=447 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() @bci=8, line=453
(Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
@bci=26, line=465 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
@bci=12, line=463 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() @bci=1, line=204
(Interpreted frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
@bci=392, line=152 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, line=56 (Interpreted
frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
@bci=718, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() @bci=20, line=57
(Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, line=57 (Interpreted
frame)
 - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 (Interpreted
frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, line=140 (Interpreted
frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, line=138 (Interpreted
frame)
 - org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext, java.lang.String,
boolean, boolean, scala.Function0) @bci=131, line=147 (Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 (Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, line=933 (Interpreted
frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, scala.collection.immutable.Map,
org.apache.spark.sql.DataFrame) @bci=293, line=197 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, line=137 (Interpreted
frame)
 - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, line=304 (Interpreted
frame)
{code}


was (Author: superwai):
Hi guys, sorry by injecting comments into the closed jira. I just want to point out that I'm
using spark 1.5.1, I got OOM in the driver side after all partitions are written out (I have
over 1 million partitions). The job was marked SUCCESS in the output folder but the driver
took significant CPU and memory. After several hours, the driver dies with OOM. I already
configure the driver to use 6GB. The jstack of the process is as follows:
{code}
Thread 528: (state = BLOCKED)
 - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
 - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 (Compiled frame)
 - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, line=114 (Compiled
frame)
 - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 (Compiled frame)
 - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled frame)
 - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
@bci=4, line=447 (Compiled frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
@bci=5, line=447 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=9, line=244
(Compiled frame)
{code}
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=2, line=244
(Compiled frame)
 - scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, line=108 (Compiled
frame)
 - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, scala.Function1,
scala.collection.generic.CanBuildFrom) @bci=17, line=244 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, scala.collection.generic.CanBuildFrom)
@bci=3, line=108 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
@bci=279, line=447 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() @bci=8, line=453
(Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
@bci=26, line=465 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
@bci=12, line=463 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() @bci=1, line=204
(Interpreted frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
@bci=392, line=152 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, line=56 (Interpreted
frame)
 - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
@bci=718, line=108 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() @bci=20, line=57
(Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, line=57 (Interpreted
frame)
 - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 (Interpreted
frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, line=140 (Interpreted
frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, line=138 (Interpreted
frame)
 - org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext, java.lang.String,
boolean, boolean, scala.Function0) @bci=131, line=147 (Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 (Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, line=933 (Interpreted
frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 (Interpreted frame)
 - org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, scala.collection.immutable.Map,
org.apache.spark.sql.DataFrame) @bci=293, line=197 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, line=137 (Interpreted
frame)
 - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, line=304 (Interpreted
frame)

> Reduce memory consumption for dynamic partition insert
> ------------------------------------------------------
>
>                 Key: SPARK-8890
>                 URL: https://issues.apache.org/jira/browse/SPARK-8890
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>            Assignee: Michael Armbrust
>            Priority: Critical
>             Fix For: 1.5.0
>
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of table partitions
is large. The problem is that we open one output writer for each partition, and when data
are randomized and when the number of partitions is large, we open a large number of output
writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active partitions
is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message