spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Quentin Auge (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-20227) Job hangs when joining a lot of aggregated columns
Date Fri, 07 Apr 2017 15:22:41 GMT

    [ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960953#comment-15960953
] 

Quentin Auge edited comment on SPARK-20227 at 4/7/17 3:22 PM:
--------------------------------------------------------------

Well, you were right to ask. After further investigation, it seems the job does not hang forever,
but takes much longer to finish when n is large enough.

I made a mistake when I mentioned the job finished in a sensible amount of time for n = 20.
Really, it starts to hang badly from n = 12 (so 11 successive aggregations).

See the following time:
!https://image.ibb.co/fxRL85/figure_1.png|n on x axis, time for the job to complete in seconds
on y axis!

The significant amount of time the job takes from n = 12 is spent after this message:
{code}
17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been
idle for 60 seconds (new desired total will be 0)
17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1.
17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0)
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1,
ip-172-30-0-149.ec2.internal, 35666, None)
17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by
driver.
17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new
total is 0)
{code}

Right after that, the job starts again and completes.


was (Author: quentin):
Well, you were right to ask. After further investigation, it seems the job does not hang forever,
but takes much longer to finish when n is large enough.

I made a mistake when I mentioned the job finished in a sensible amount of time for n = 20.
Really, it starts to hang badly from n = 12 (so 11 successive aggregations).

See the following time:
!https://ibb.co/d9QrFk|n on x axis, time for the job to complete in seconds on y axis!

The significant amount of time the job takes from n = 12 is spent after this message:
{code}
17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been
idle for 60 seconds (new desired total will be 0)
17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1.
17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0)
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1,
ip-172-30-0-149.ec2.internal, 35666, None)
17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by
driver.
17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new
total is 0)
{code}

Right after that, the job starts again and completes.

> Job hangs when joining a lot of aggregated columns
> --------------------------------------------------
>
>                 Key: SPARK-20227
>                 URL: https://issues.apache.org/jira/browse/SPARK-20227
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: AWS emr-5.4.0, master: m4.xlarge, core: 4 m4.xlarge
>            Reporter: Quentin Auge
>
> I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves,
and then join the resulting dataframe.
> {code}
> # Create a dataframe with 1 row and 50 columns
> n = 50
> df = sc.parallelize([Row(*range(n))]).toDF()
> cols = df.columns
> # Replace each column values with aggregated values
> window = Window.partitionBy(cols[0])
> for col in cols[1:]:
>     df = df.withColumn(col, sum(col).over(window))
> # Join
> other_df = sc.parallelize([Row(0)]).toDF()
> result = other_df.join(df, on = cols[0])
> result.show()
> {code}
> Spark hangs forever when executing the last line. The strange thing is, it depends on
the number of columns. Spark does not hang for n = 5, 10, or 20 columns. For n = 50 and beyond,
it does.
> {code}
> 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has
been idle for 60 seconds (new desired total will be 0)
> 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1.
> 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0)
> 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
> 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1,
ip-172-30-0-149.ec2.internal, 35666, None)
> 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
> 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed
by driver.
> 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed
(new total is 0)
> {code}
> All executors are inactive and thus killed after 60 seconds, the master spends some CPU
on a process that hangs indefinitely, and the workers are idle.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message