spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "vidmantas zemleris (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-4644) Implement skewed join
Date Wed, 30 Sep 2015 15:53:14 GMT

    [ https://issues.apache.org/jira/browse/SPARK-4644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14937033#comment-14937033
] 

vidmantas zemleris edited comment on SPARK-4644 at 9/30/15 3:53 PM:
--------------------------------------------------------------------

IMHO a good start could be implementing special case of left-join with NULL values in join
conditions (and would be much simpler and more efficient than proposed implementation generic
skew join [1]). 

currently we're only interested in this special case of NULL values, consider:

{code:sql}
SELECT * FROM t1
LEFT JOIN t2 
ON (t1.nullableCol = t2.nullableCol AND t1.portal = t2.portal)
{code}

The problem is that if the join key contains a column with many NULL values (and it's the
only join column, or arrity of other join columns is low), it'll get shuffled to one or few
tasks - causing straggler tasks, or failing altogether.

A naive solution could be along these lines:

{code:scala}
def nullAwareLeftJoin(left: DataFrame, right: DataFrame, joinConditions: Column) = {
  val rowsWithNulls = left.filter(joinConditions.containsNullValues)
  val safeRows = left
     .filter(!joinConditions.containsNullValues)
     .join(right, joinConditions)

  safeRows.unionAll(
    rowsWithNulls.addNullsForMissingColumns(safeRows.columns)
  )
}
{code}

likely this could be more efficient if implemented internally...


What do you think guys?
--

[1] https://github.com/tresata/spark-skewjoin 

> A skew join is just like a normal join except that keys with large amounts of values
are not processed by a single task but instead spread out across many tasks. This is achieved
by replicating key-value pairs for one side of the join in such way that they go to multiple
tasks...


was (Author: vidma):
IMHO a good start could be implementing special case of left-join with NULL values in join
conditions (and would be much simpler and more efficient than proposed implementation generic
skew join [1]). 

currently we're only interested in this special case of NULL values, consider:

{code:sql}
SELECT * FROM t1
LEFT JOIN t2 
ON (t1.nullableCol = t2.nullableCol AND t1.portal = t2.portal)
{code}

The problem is that if the join key contains a column with many NULL values (and it's the
only join column, or arrity of other join columns is low), it'll get shuffled to one or few
tasks - causing straggler tasks, or failing altogether.

A naive solution could be along these lines:

```scala
def nullAwareLeftJoin(left: DataFrame, right: DataFrame, joinConditions: Column) = {
  val rowsWithNulls = left.filter(joinConditions.containsNullValues)
  val safeRows = left
     .filter(!joinConditions.containsNullValues)
     .join(right, joinConditions)

  safeRows.unionAll(
    rowsWithNulls.addNullsForMissingColumns(safeRows.columns)
  )
}
```

likely this could be more efficient if implemented internally...


What do you think guys?
--

[1] https://github.com/tresata/spark-skewjoin 

> A skew join is just like a normal join except that keys with large amounts of values
are not processed by a single task but instead spread out across many tasks. This is achieved
by replicating key-value pairs for one side of the join in such way that they go to multiple
tasks...

> Implement skewed join
> ---------------------
>
>                 Key: SPARK-4644
>                 URL: https://issues.apache.org/jira/browse/SPARK-4644
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Shixiong Zhu
>         Attachments: Skewed Join Design Doc.pdf
>
>
> Skewed data is not rare. For example, a book recommendation site may have several books
which are liked by most of the users. Running ALS on such skewed data will raise a OutOfMemory
error, if some book has too many users which cannot be fit into memory. To solve it, we propose
a skewed join implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message