[ https://issues.apache.org/jira/browse/SPARK24020?page=com.atlassian.jira.plugin.system.issuetabpanels:alltabpanel
]
Petar Zecevic updated SPARK24020:

Description:
The problem we are solving is the case where you have two big tables partitioned by X column,
but also sorted within partitions by Y column and you need to calculate an expensive function
on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial
distance calculation). But you could theoretically reduce the number of joined rows for which
the calculation itself is performed by using a range condition on the Y column. Something
like this:
{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y  d AND t2.Y + d AND <function calculation...>}}
However, during a sortmerge join with this range condition specified, Spark will first crossjoin
all the rows with the same X value and only then try to apply the range condition and any
function calculations. This happens because, inside the generated sortmerge join (SMJ) code,
these extra conditions are put in the same block with the function being calculated and there
is no way to evaluate these conditions before reading all the rows to be checked into memory
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows
per X, this can result in a huge number of calculations and a huge number of rows in executor
memory, which can be unfeasible.
h3. The solution implementation
We therefore propose a change to the sortmerge join so that, when these extra conditions
are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This
queue is then used as a moving window through the values from the right relation as the left
row changes. You could call this a combination of an equijoin and a theta join; in literature
it is sometimes called an “epsilon join”. We call it a "sortmerge inner range join".
This design uses much less memory (not all rows with the same values of X need to be loaded
into memory at once) and requires a much lower number of comparisons (the validity of this
statement depends on the actual data and conditions used).
h3. The classes that need to be changed
For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the
case where a simple range condition with lower and upper limits is used on a secondary column
(a column not included in the equijoin condition). The pattern also needs to extract the
information later required for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the
_ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the
structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure
needs to be a queue. To make the change as less intrusive as possible, we propose to implement
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of
the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two
code paths:
** when wholestage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)
** when wholestage code generation is turned on (methods doProduce and genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with innerrange optimization in
the case when wholestage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests
h3. Triggering the optimization
The optimization should be triggered automatically when an equijoin expression is present
AND lower and upper range conditions on a secondary column are specified. If the tables aren't
sorted by both columns, appropriate sorts should be added.
To limit the impact of this change we also propose adding a new parameter (tentatively named
"spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization
entirely.
h3. Applicable use cases
Potential usecases for this are joins based on spatial or temporal distance calculations.
was:
The problem we are solving is the case where you have two big tables partitioned by X column,
but also sorted within partitions by Y column and you need to calculate an expensive function
on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial
distance calculation). But you could theoretically reduce the number of joined rows for which
the calculation itself is performed by using a range condition on the Y column. Something
like this:
{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y  d AND t2.Y + d AND <function calculation...>}}
However, during a sortmerge join with this range condition specified, Spark will first crossjoin
all the rows with the same X value and only then try to apply the range condition and any
function calculations. This happens because, inside the generated sortmerge join (SMJ) code,
these extra conditions are put in the same block with the function being calculated and there
is no way to evaluate these conditions before reading all the rows to be checked into memory
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows
per X, this can result in a huge number of calculations and a huge number of rows in executor
memory, which can be unfeasible.
h3. The solution implementation
We therefore propose a change to the sortmerge join so that, when these extra conditions
are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This
queue is then used as a moving window through the values from the right relation as the left
row changes. You could call this a combination of an equijoin and a theta join; in literature
it is sometimes called an “epsilon join”. We call it a "sortmerge inner range join".
This design uses much less memory (not all rows with the same values of X need to be loaded
into memory at once) and requires a much lower number of comparisons (the validity of this
statement depends on the actual data and conditions used).
h3. The classes that need to be changed
For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the
case where a simple range condition with lower and upper limits is used on a secondary column
(a column not included in the equijoin condition). The pattern also needs to extract the
information later required for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the
_ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the
structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure
needs to be a queue. To make the change as less intrusive as possible, we propose to implement
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of
the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two
code paths:
** when wholestage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)
** when wholestage code generation is turned on (methods doProduce and genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with innerrange optimization in
the case when wholestage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests
h3. Triggering the optimization
The optimization should be triggered automatically when an equijoin expression is present
AND lower and upper range conditions on a secondary column are specified. If the tables aren't
sorted by both columns, appropriate sorts should be added.
To limit the impact of this change we also propose adding a new parameter (tentatively named
"spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization
entirely.
h2. Applicable use cases
Potential usecases for this are joins based on spatial or temporal distance calculations.
> Sortmerge join inner range optimization
> 
>
> Key: SPARK24020
> URL: https://issues.apache.org/jira/browse/SPARK24020
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.0
> Reporter: Petar Zecevic
> Priority: Major
> Attachments: SMJinnerRangePR24020designDoc.pdf
>
>
> The problem we are solving is the case where you have two big tables partitioned by X
column, but also sorted within partitions by Y column and you need to calculate an expensive
function on the joined rows, which reduces the number of output rows (e.g. condition based
on a spatial distance calculation). But you could theoretically reduce the number of joined
rows for which the calculation itself is performed by using a range condition on the Y column.
Something like this:
> {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y  d AND t2.Y + d AND <function calculation...>}}
> However, during a sortmerge join with this range condition specified, Spark will first
crossjoin all the rows with the same X value and only then try to apply the range condition
and any function calculations. This happens because, inside the generated sortmerge join
(SMJ) code, these extra conditions are put in the same block with the function being calculated
and there is no way to evaluate these conditions before reading all the rows to be checked
into memory (into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large
number of rows per X, this can result in a huge number of calculations and a huge number of
rows in executor memory, which can be unfeasible.
> h3. The solution implementation
> We therefore propose a change to the sortmerge join so that, when these extra conditions
are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This
queue is then used as a moving window through the values from the right relation as the left
row changes. You could call this a combination of an equijoin and a theta join; in literature
it is sometimes called an “epsilon join”. We call it a "sortmerge inner range join".
> This design uses much less memory (not all rows with the same values of X need to be
loaded into memory at once) and requires a much lower number of comparisons (the validity
of this statement depends on the actual data and conditions used).
> h3. The classes that need to be changed
> For implementing the described change we propose changes to these classes:
> * _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize
the case where a simple range condition with lower and upper limits is used on a secondary
column (a column not included in the equijoin condition). The pattern also needs to extract
the information later required for code generation etc.
> * _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of
the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from
the structure as the left key (X) changes, or the left secondary value (Y) changes, so the
structure needs to be a queue. To make the change as less intrusive as possible, we propose
to implement _InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
> * _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware
of the extracted range conditions
> * _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support
two code paths:
> ** when wholestage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)
> ** when wholestage code generation is turned on (methods doProduce and genScanner)
> * _SortMergeJoinInnerRangeScanner_ – implements the SMJ with innerrange optimization
in the case when wholestage codegen is turned off
> * _InnerJoinSuite_ – functional tests
> * _JoinBenchmark_ – performance tests
> h3. Triggering the optimization
> The optimization should be triggered automatically when an equijoin expression is present
AND lower and upper range conditions on a secondary column are specified. If the tables aren't
sorted by both columns, appropriate sorts should be added.
> To limit the impact of this change we also propose adding a new parameter (tentatively
named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the
optimization entirely.
> h3. Applicable use cases
> Potential usecases for this are joins based on spatial or temporal distance calculations.
>

This message was sent by Atlassian JIRA
(v7.6.3#76005)

To unsubscribe, email: issuesunsubscribe@spark.apache.org
For additional commands, email: issueshelp@spark.apache.org
