spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pierre Lienhart (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet lead to OOM errors
Date Mon, 19 Nov 2018 13:12:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Pierre Lienhart updated SPARK-26116:
------------------------------------
    Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark sorts each partition
before writing but this sort consumes a huge amount of memory compared to the size of the
data. The executors can then go OOM and get killed by YARN. As a consequence, it also force
to provision huge amount of memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

 
{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in stage 2.0
failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 (TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx,
executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 8.1 GB of 8 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most recent failure:
Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, executor 1): org.apache.spark.SparkException:
Task failed while writing rows

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

         at org.apache.spark.scheduler.Task.run(Task.scala:99)

         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

         at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804
: /app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
(No such file or directory)

         at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

         at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

         at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

         at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

         ... 8 more{code}
 

 

In the stderr logs, we can see that huge amount of sort data (the partition being sorted here
is 250 MB when persisted into memory, deserialized) is being spilled to the disk ({{INFO UnsafeExternalSorter:
Thread 155 spilling sort data of 3.6 GB to disk}}). Sometimes the data is spilled in time
to the disk and the sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out
partition files one at a time.}}) but sometimes it does not and we see multiple {{TaskMemoryManager:
Failed to allocate a page (67108864 bytes), try again.}} until the application finally runs
OOM with logs such as {{ERROR UnsafeExternalSorter: Unable to grow the pointer array}}.

I should mention that when looking at individual (successful) write tasks in the Spark UI,
the Peak Execution Memory metric is always 0.  

It looks like a known issue : SPARK-12546 is explicitly related and led to a PR that decreased
{{spark.sql.sources.maxConcurrentWrites}} default value from 5 to 1. [Spark 1.6.0 release
notes|https://spark.apache.org/releases/spark-release-1-6-0.html] also mentions this problem
as a “Know Issue” and as described in SPARK-12546, advise to tweak both {{spark.memory.fraction}}
and {{spark.hadoop.parquet.memory.pool.ratio}} without any explanation regarding how this
should help (and these values help indeed).

Could we at least enhance the documentation on this issue? I would be really helpful for me
to understand what is happening in terms of memory so that I can better size my application
and/or choose the most appropriate memory parameters. Still, how does it come that the sort
generate that much data ?

I am running Spark 2.1.1 and do not know whether I would encounter this issue in later versions.

Many thanks,

Pierre LIENHART

  was:
When writing partitioned parquet using {{partitionBy}}, it looks like Spark sorts each partition
before writing but this sort consumes a huge amount of memory compared to the size of the
data. The executors can then go OOM and get killed by YARN. As a consequence, it also force
to provision huge amount of memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

 
{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in stage 2.0
failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 (TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx,
executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 8.1 GB of 8 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most recent failure:
Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, executor 1): org.apache.spark.SparkException:
Task failed while writing rows

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

         at org.apache.spark.scheduler.Task.run(Task.scala:99)

         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

         at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804
: /app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
(No such file or directory)

         at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

         at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

         at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

         at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

         ... 8 more{code}
 

 

In the stderr logs, we can see that huge amount of sort data (the partition being sorted here
is 250 MB when persisted into memory, deserialized) is being spilled to the disk ({{INFO UnsafeExternalSorter:
Thread 155 spilling sort data of 3.6 GB to disk}}). Sometimes the data is spilled in time
to the disk and the sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out
partition files one at a time.}}) but sometimes it does not and we see multiple {{TaskMemoryManager:
Failed to allocate a page (67108864 bytes), try again.}} until the application finally runs
OOM with logs such as {{ERROR UnsafeExternalSorter: Unable to grow the pointer array}}.

I should mention that when looking at individual (successful) write tasks in the Spark UI,
the Peak Execution Memory metric is always 0.  

It looks like a known issue : SPARK-12546 is explicitly related and led to a PR that decreased
{{spark.sql.sources.maxConcurrentWrites}} default value from 5 to 1. [Spark 1.6.0 release
notes|https://spark.apache.org/releases/spark-release-1-6-0.html] also mentions this problem
as a “Know Issue” and as described in SPARK-12546, advise to tweak both {{spark.memory.fraction}}
and {{spark.hadoop.parquet.memory.pool.ratio}} without any explanation regarding how this
should help.

Could we at least enhance the documentation on this issue? I would be really helpful for me
to understand what is happening in terms of memory so that I can better size my application
and/or choose the most appropriate memory parameters. Still, how does it come that the sort
generate that much data ?

I am running Spark 2.1.1 and do not know whether I would encounter this issue in later versions.

Many thanks,

Pierre LIENHART


> Spark SQL - Sort when writing partitioned parquet lead to OOM errors
> --------------------------------------------------------------------
>
>                 Key: SPARK-26116
>                 URL: https://issues.apache.org/jira/browse/SPARK-26116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: Pierre Lienhart
>            Priority: Major
>
> When writing partitioned parquet using {{partitionBy}}, it looks like Spark sorts each
partition before writing but this sort consumes a huge amount of memory compared to the size
of the data. The executors can then go OOM and get killed by YARN. As a consequence, it also
force to provision huge amount of memory compared to the data to be written.
> Error messages found in the Spark UI are like the following :
>  
> {code:java}
> Spark UI description of failure : Job aborted due to stage failure: Task 169 in stage
2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 (TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx,
executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 8.1 GB of 8 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
> {code}
>  
> {code:java}
> Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most recent failure:
Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, executor 1): org.apache.spark.SparkException:
Task failed while writing rows
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
>          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>          at org.apache.spark.scheduler.Task.run(Task.scala:99)
>          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>          at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804
: /app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
(No such file or directory)
>          at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
>          at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)
>          at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)
>          at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)
>          at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)
>          at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
>          at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)
>          at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
>          ... 8 more{code}
>  
>  
> In the stderr logs, we can see that huge amount of sort data (the partition being sorted
here is 250 MB when persisted into memory, deserialized) is being spilled to the disk ({{INFO
UnsafeExternalSorter: Thread 155 spilling sort data of 3.6 GB to disk}}). Sometimes the data
is spilled in time to the disk and the sort completes ({{INFO FileFormatWriter: Sorting complete.
Writing out partition files one at a time.}}) but sometimes it does not and we see multiple
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} until the application
finally runs OOM with logs such as {{ERROR UnsafeExternalSorter: Unable to grow the pointer
array}}.
> I should mention that when looking at individual (successful) write tasks in the Spark
UI, the Peak Execution Memory metric is always 0.  
> It looks like a known issue : SPARK-12546 is explicitly related and led to a PR that
decreased {{spark.sql.sources.maxConcurrentWrites}} default value from 5 to 1. [Spark 1.6.0
release notes|https://spark.apache.org/releases/spark-release-1-6-0.html] also mentions this
problem as a “Know Issue” and as described in SPARK-12546, advise to tweak both {{spark.memory.fraction}}
and {{spark.hadoop.parquet.memory.pool.ratio}} without any explanation regarding how this
should help (and these values help indeed).
> Could we at least enhance the documentation on this issue? I would be really helpful
for me to understand what is happening in terms of memory so that I can better size my application
and/or choose the most appropriate memory parameters. Still, how does it come that the sort
generate that much data ?
> I am running Spark 2.1.1 and do not know whether I would encounter this issue in later
versions.
> Many thanks,
> Pierre LIENHART



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message