spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nikolaos Tsipas (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-21935) Pyspark UDF causing ExecutorLostFailure
Date Wed, 06 Sep 2017 10:49:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Nikolaos Tsipas updated SPARK-21935:
------------------------------------
    Description: 
Hi,

I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as follows:
{code}
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

path = 's3://some/parquet/dir/myfile.parquet'
df = spark.read.load(path)
def _test_udf(useragent):
    return useragent.upper()

test_udf = udf(_test_udf, StringType())
df = df.withColumn('test_field', test_udf(col('std_useragent')))
df.write.parquet('/output.parquet')
{code}

The following config is used in {{spark-defaults.conf}} (using {{maximizeResourceAllocation}}
in EMR)
{code}
...
spark.executor.instances         4
spark.executor.cores             8
spark.driver.memory              8G
spark.executor.memory            9658M
spark.default.parallelism        64
spark.driver.maxResultSize       3G
...
{code}
The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 15 GiB memory,
160 SSD GB storage

The above example fails every single time with errors like the following:
{code}
17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, ip-172-31-7-125.eu-west-1.compute.internal,
executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
{code}

I tried to increase the  {{spark.yarn.executor.memoryOverhead}} to 3000 which delays the errors
but eventually I get them before the end of the job. The job eventually fails.

!Screen Shot 2017-09-06 at 11.31.31.png|width=800!

If I run the above job in scala everything works as expected (without having to adjust the
memoryOverhead)

{code}
import org.apache.spark.sql.functions.udf

val upper: String => String = _.toUpperCase
val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
val upperUDF = udf(upper)
val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
newdf.write.parquet("/output.parquet")
{code}

!Screen Shot 2017-09-06 at 11.31.13.png|width=800!

Cpu utilisation is very bad with pyspark

!Screen Shot 2017-09-06 at 11.30.28.png|width=800!

Is this a known bug with pyspark and udfs or is it a matter of bad configuration? 
Looking forward to suggestions. Thanks!



  was:
Hi,

I'm using spark 2.1.0 on AWS EMR and trying to use a UDF in python as follows:
{code}
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

path = 's3://some/parquet/dir/myfile.parquet'
df = spark.read.load(path)
def _test_udf(useragent):
    return useragent.upper()

test_udf = udf(_test_udf, StringType())
df = df.withColumn('test_field', test_udf(col('std_useragent')))
df.write.parquet('/output.parquet')
{code}

The following config is used in {{spark-defaults.conf}} (using {{maximizeResourceAllocation}}
in EMR)
{code}
...
spark.executor.instances         4
spark.executor.cores             8
spark.driver.memory              8G
spark.executor.memory            9658M
spark.default.parallelism        64
spark.driver.maxResultSize       3G
...
{code}
The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 15 GiB memory,
160 SSD GB storage

The above example fails every single time with errors like the following:
{code}
17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, ip-172-31-7-125.eu-west-1.compute.internal,
executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
{code}

I tried to increase the  {{spark.yarn.executor.memoryOverhead}} to 3000 which delays the errors
but eventually I get them before the end of the job. The job eventually fails.

!Screen Shot 2017-09-06 at 11.31.31.png|width=800!

If I run the above job in scala everything works as expected (without having to adjust the
memoryOverhead)

{code}
import org.apache.spark.sql.functions.udf

val upper: String => String = _.toUpperCase
val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
val upperUDF = udf(upper)
val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
newdf.write.parquet("/output.parquet")
{code}

!Screen Shot 2017-09-06 at 11.31.13.png|width=800!

Cpu utilisation is very bad with pyspark

!Screen Shot 2017-09-06 at 11.30.28.png|width=800!

Is this a known bug with pyspark and udfs or is it a matter of bad configuration? 
Looking forward to suggestions. Thanks!




> Pyspark UDF causing ExecutorLostFailure 
> ----------------------------------------
>
>                 Key: SPARK-21935
>                 URL: https://issues.apache.org/jira/browse/SPARK-21935
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.1.0
>            Reporter: Nikolaos Tsipas
>              Labels: pyspark, udf
>         Attachments: Screen Shot 2017-09-06 at 11.30.28.png, Screen Shot 2017-09-06 at
11.31.13.png, Screen Shot 2017-09-06 at 11.31.31.png
>
>
> Hi,
> I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as follows:
> {code}
> from pyspark.sql.functions import col, udf
> from pyspark.sql.types import StringType
> path = 's3://some/parquet/dir/myfile.parquet'
> df = spark.read.load(path)
> def _test_udf(useragent):
>     return useragent.upper()
> test_udf = udf(_test_udf, StringType())
> df = df.withColumn('test_field', test_udf(col('std_useragent')))
> df.write.parquet('/output.parquet')
> {code}
> The following config is used in {{spark-defaults.conf}} (using {{maximizeResourceAllocation}}
in EMR)
> {code}
> ...
> spark.executor.instances         4
> spark.executor.cores             8
> spark.driver.memory              8G
> spark.executor.memory            9658M
> spark.default.parallelism        64
> spark.driver.maxResultSize       3G
> ...
> {code}
> The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 15 GiB memory,
160 SSD GB storage
> The above example fails every single time with errors like the following:
> {code}
> 17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, ip-172-31-7-125.eu-west-1.compute.internal,
executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> {code}
> I tried to increase the  {{spark.yarn.executor.memoryOverhead}} to 3000 which delays
the errors but eventually I get them before the end of the job. The job eventually fails.
> !Screen Shot 2017-09-06 at 11.31.31.png|width=800!
> If I run the above job in scala everything works as expected (without having to adjust
the memoryOverhead)
> {code}
> import org.apache.spark.sql.functions.udf
> val upper: String => String = _.toUpperCase
> val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
> val upperUDF = udf(upper)
> val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
> newdf.write.parquet("/output.parquet")
> {code}
> !Screen Shot 2017-09-06 at 11.31.13.png|width=800!
> Cpu utilisation is very bad with pyspark
> !Screen Shot 2017-09-06 at 11.30.28.png|width=800!
> Is this a known bug with pyspark and udfs or is it a matter of bad configuration? 
> Looking forward to suggestions. Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message