spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrew Korzhuev (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming
Date Wed, 28 Mar 2018 14:52:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Andrew Korzhuev updated SPARK-23682:
------------------------------------
    Attachment: Screen Shot 2018-03-28 at 16.44.20.png

> Memory issue with Spark structured streaming
> --------------------------------------------
>
>                 Key: SPARK-23682
>                 URL: https://issues.apache.org/jira/browse/SPARK-23682
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps
-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled
-XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>            Reporter: Yuriy Bondaruk
>            Priority: Major
>              Labels: Memory, memory, memory-leak
>         Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 2018-03-10 at
18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream with aggregation
(dropDuplicates()) and data partitioning constantly increases memory usage and finally executors
fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason:
Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal.
Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
>     .readStream()
>     .schema(inputSchema)
>     .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
>     .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
>     .csv("s3://test-bucket/input")
>     .as(Encoders.bean(TestRecord.class))
>     .flatMap(mf, Encoders.bean(TestRecord.class))
>     .dropDuplicates("testId", "testName")
>     .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType),
"YYYY"))
>     .writeStream()
>     .option("path", "s3://test-bucket/output")
>     .option("checkpointLocation", "s3://test-bucket/checkpoint")
>     .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
>     .partitionBy("year")
>     .format("parquet")
>     .outputMode(OutputMode.Append())
>     .queryName("test-stream")
>     .start();{quote}
> Analyzing the heap dump I found that most of the memory used by {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} that
is referenced from [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] 
> On the first glance it looks normal since that is how Spark keeps aggregation keys in
memory. However I did my testing by renaming files in source folder, so that they could be
picked up by spark again. Since input records are the same all further rows should be rejected
as duplicates and memory consumption shouldn't increase but it's not true. Moreover, GC time
took more than 30% of total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message