spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Petar Zecevic (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization
Date Thu, 28 Jun 2018 15:05:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Petar Zecevic updated SPARK-24020:
----------------------------------
    Description: 
The problem we are solving is the case where you have two big tables partitioned by X column,
but also sorted within partitions by Y column and you need to calculate an expensive function
on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial
distance calculation). But you could theoretically reduce the number of joined rows for which
the calculation itself is performed by using a range condition on the Y column. Something
like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND <function calculation...>}}

However, during a sort-merge join with this range condition specified, Spark will first cross-join
all the rows with the same X value and only then try to apply the range condition and any
function calculations. This happens because, inside the generated sort-merge join (SMJ) code,
these extra conditions are put in the same block with the function being calculated and there
is no way to evaluate these conditions before reading all the rows to be checked into memory
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows
per X, this can result in a huge number of calculations and a huge number of rows in executor
memory, which can be unfeasible.
h3. The solution implementation

We therefore propose a change to the sort-merge join so that, when these extra conditions
are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This
queue is then used as a moving window through the values from the right relation as the left
row changes. You could call this a combination of an equi-join and a theta join; in literature
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join".
This design uses much less memory (not all rows with the same values of X need to be loaded
into memory at once) and requires a much lower number of comparisons (the validity of this
statement depends on the actual data and conditions used).
h3. The classes that need to be changed 

For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the
case where a simple range condition with lower and upper limits is used on a secondary column
(a column not included in the equi-join condition). The pattern also needs to extract the
information later required for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the
_ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the
structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure
needs to be a queue. To make the change as less intrusive as possible, we propose to implement
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of
the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two
code paths: 
** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)

** when whole-stage code generation is turned on (methods doProduce and genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization in
the case when whole-stage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests

h3. Triggering the optimization

The optimization should be triggered automatically when an equi-join expression is present
AND lower and upper range conditions on a secondary column are specified. If the tables aren't
sorted by both columns, appropriate sorts should be added.

To limit the impact of this change we also propose adding a new parameter (tentatively named
"spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization
entirely.

h3. Applicable use cases

Potential use-cases for this are joins based on spatial or temporal distance calculations.

 

  was:
The problem we are solving is the case where you have two big tables partitioned by X column,
but also sorted within partitions by Y column and you need to calculate an expensive function
on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial
distance calculation). But you could theoretically reduce the number of joined rows for which
the calculation itself is performed by using a range condition on the Y column. Something
like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND <function calculation...>}}

However, during a sort-merge join with this range condition specified, Spark will first cross-join
all the rows with the same X value and only then try to apply the range condition and any
function calculations. This happens because, inside the generated sort-merge join (SMJ) code,
these extra conditions are put in the same block with the function being calculated and there
is no way to evaluate these conditions before reading all the rows to be checked into memory
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows
per X, this can result in a huge number of calculations and a huge number of rows in executor
memory, which can be unfeasible.
h3. The solution implementation

We therefore propose a change to the sort-merge join so that, when these extra conditions
are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This
queue is then used as a moving window through the values from the right relation as the left
row changes. You could call this a combination of an equi-join and a theta join; in literature
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join".
This design uses much less memory (not all rows with the same values of X need to be loaded
into memory at once) and requires a much lower number of comparisons (the validity of this
statement depends on the actual data and conditions used).
h3. The classes that need to be changed 

For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the
case where a simple range condition with lower and upper limits is used on a secondary column
(a column not included in the equi-join condition). The pattern also needs to extract the
information later required for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the
_ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the
structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure
needs to be a queue. To make the change as less intrusive as possible, we propose to implement
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of
the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two
code paths: 
** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)

** when whole-stage code generation is turned on (methods doProduce and genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization in
the case when whole-stage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests

h3. Triggering the optimization

The optimization should be triggered automatically when an equi-join expression is present
AND lower and upper range conditions on a secondary column are specified. If the tables aren't
sorted by both columns, appropriate sorts should be added.

To limit the impact of this change we also propose adding a new parameter (tentatively named
"spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization
entirely.

h2. Applicable use cases

Potential use-cases for this are joins based on spatial or temporal distance calculations.

 


> Sort-merge join inner range optimization
> ----------------------------------------
>
>                 Key: SPARK-24020
>                 URL: https://issues.apache.org/jira/browse/SPARK-24020
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Petar Zecevic
>            Priority: Major
>         Attachments: SMJ-innerRange-PR24020-designDoc.pdf
>
>
> The problem we are solving is the case where you have two big tables partitioned by X
column, but also sorted within partitions by Y column and you need to calculate an expensive
function on the joined rows, which reduces the number of output rows (e.g. condition based
on a spatial distance calculation). But you could theoretically reduce the number of joined
rows for which the calculation itself is performed by using a range condition on the Y column.
Something like this:
> {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND <function calculation...>}}
> However, during a sort-merge join with this range condition specified, Spark will first
cross-join all the rows with the same X value and only then try to apply the range condition
and any function calculations. This happens because, inside the generated sort-merge join
(SMJ) code, these extra conditions are put in the same block with the function being calculated
and there is no way to evaluate these conditions before reading all the rows to be checked
into memory (into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large
number of rows per X, this can result in a huge number of calculations and a huge number of
rows in executor memory, which can be unfeasible.
> h3. The solution implementation
> We therefore propose a change to the sort-merge join so that, when these extra conditions
are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This
queue is then used as a moving window through the values from the right relation as the left
row changes. You could call this a combination of an equi-join and a theta join; in literature
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join".
> This design uses much less memory (not all rows with the same values of X need to be
loaded into memory at once) and requires a much lower number of comparisons (the validity
of this statement depends on the actual data and conditions used).
> h3. The classes that need to be changed 
> For implementing the described change we propose changes to these classes:
> * _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize
the case where a simple range condition with lower and upper limits is used on a secondary
column (a column not included in the equi-join condition). The pattern also needs to extract
the information later required for code generation etc.
> * _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of
the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from
the structure as the left key (X) changes, or the left secondary value (Y) changes, so the
structure needs to be a queue. To make the change as less intrusive as possible, we propose
to implement _InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
> * _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware
of the extracted range conditions
> * _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support
two code paths: 
> ** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)

> ** when whole-stage code generation is turned on (methods doProduce and genScanner)
> * _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization
in the case when whole-stage codegen is turned off
> * _InnerJoinSuite_ – functional tests
> * _JoinBenchmark_ – performance tests
> h3. Triggering the optimization
> The optimization should be triggered automatically when an equi-join expression is present
AND lower and upper range conditions on a secondary column are specified. If the tables aren't
sorted by both columns, appropriate sorts should be added.
> To limit the impact of this change we also propose adding a new parameter (tentatively
named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the
optimization entirely.
> h3. Applicable use cases
> Potential use-cases for this are joins based on spatial or temporal distance calculations.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message