spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Liang-Chi Hsieh (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24961) sort operation causes out of memory
Date Sat, 18 Aug 2018 23:29:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584961#comment-16584961
] 

Liang-Chi Hsieh commented on SPARK-24961:
-----------------------------------------

When you run global sort, Spark will do data sampling for range shuffle. How many data is
sampled per partition is controlled by this config {{spark.sql.execution.rangeExchange.sampleSizePerPartition}}.
This is an internal config. As the memory size is decided by sampling size, number of partition
and row size, that is why when you increased row size, it causes OOM.

So you can set lower partition number or {{spark.sql.execution.rangeExchange.sampleSizePerPartition}}
to avoid the OOM.

> sort operation causes out of memory 
> ------------------------------------
>
>                 Key: SPARK-24961
>                 URL: https://issues.apache.org/jira/browse/SPARK-24961
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.3.1
>         Environment: Java 1.8u144+
> Windows 10
> Spark 2.3.1 in local mode
> -Xms4g -Xmx4g
> optional: -XX:+UseParallelOldGC 
>            Reporter: Markus Breuer
>            Priority: Major
>
> A sort operation on large rdd - which does not fit in memory - causes out of memory exception.
I made the effect reproducable by an sample, the sample creates large object of about 2mb
size. When saving result the oom occurs. I tried several StorageLevels, but if memory is included
(MEMORY_AND_DISK, MEMORY_AND_DISK_SER, none) application runs in out of memory. Only DISK_ONLY
seems to work.
> When replacing sort() with sortWithinPartitions() no StorageLevel is required and application
succeeds.
> {code:java}
> package de.bytefusion.examples;
> import breeze.storage.Storage;
> import de.bytefusion.utils.Options;
> import org.apache.hadoop.io.MapFile;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.SequenceFileOutputFormat;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> import static org.apache.spark.sql.functions.*;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> public class Example3 {
>     public static void main(String... args) {
>         // create spark session
>         SparkSession spark = SparkSession.builder()
>                 .appName("example1")
>                 .master("local[4]")
>                 .config("spark.driver.maxResultSize","1g")
>                 .config("spark.driver.memory","512m")
>                 .config("spark.executor.memory","512m")
>                 .config("spark.local.dir","d:/temp/spark-tmp")
>                 .getOrCreate();
>         JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
>         // base to generate huge data
>         List<Integer> list = new ArrayList<>();
>         for (int val = 1; val < 10000; val++) {
>             int valueOf = Integer.valueOf(val);
>             list.add(valueOf);
>         }
>         // create simple rdd of int
>         JavaRDD<Integer> rdd = sc.parallelize(list,200);
>         // use map to create large object per row
>         JavaRDD<Row> rowRDD =
>                 rdd
>                         .map(value -> RowFactory.create(String.valueOf(value), createLongText(UUID.randomUUID().toString(),
2 * 1024 * 1024)))
>                         // no persist => out of memory exception on write()
>                         // persist MEMORY_AND_DISK => out of memory exception on write()
>                         // persist MEMORY_AND_DISK_SER => out of memory exception
on write()
>                         // persist(StorageLevel.DISK_ONLY())
>                 ;
>         StructType type = new StructType();
>         type = type
>                 .add("c1", DataTypes.StringType)
>                 .add( "c2", DataTypes.StringType );
>         Dataset<Row> df = spark.createDataFrame(rowRDD, type);
>         // works
>         df.show();
>         df = df
>                 .sort(col("c1").asc() )
>             ;
>         df.explain();
>         // takes a lot of time but works
>         df.show();
>         // OutOfMemoryError: java heap space
>         df
>             .write()
>             .mode("overwrite")
>             .csv("d:/temp/my.csv");
>         // OutOfMemoryError: java heap space
>         df
>                 .toJavaRDD()
>                 .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new Text(
row.getString(1))))
>                 .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, SequenceFileOutputFormat.class
);
>     }
>     private static String createLongText( String text, int minLength ) {
>         StringBuffer sb = new StringBuffer();
>         while( sb.length() < minLength ) {
>             sb.append(text);
>         }
>         return sb.toString();
>     }
> }
> {code}
> When using StorageLevel.MEMORY_AND_DISK(_SER) an oom crashes application at partition
70 at heap usage of 3g from 4g available.
> It seems sort does something like collect, an heap dump shows very large array of array
- possibly the partition contents. Also spark.driver.maxResultSize is involved, so sort exceeds
the default values. Setting it to unlimited causes oom.
> Why do I think this is a bug?
>  # Operation sort() should not involve maxResultSize
>  # MEMORY_AND_DISK should work at all and at least disk should be used. But I see oom
when reaching 3g of 4g total heap size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message