spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "caoxuewen (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-23676) Support left join codegen in SortMergeJoinExec
Date Wed, 14 Mar 2018 07:42:00 GMT
caoxuewen created SPARK-23676:
---------------------------------

             Summary: Support left join codegen in SortMergeJoinExec
                 Key: SPARK-23676
                 URL: https://issues.apache.org/jira/browse/SPARK-23676
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: caoxuewen


This PR generates java code to directly complete the function of LeftOuter in `SortMergeJoinExec`
without using an iterator. 
This PR improves runtime performance by this generates java code.

joinBenchmark result: **1.3x**
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------
left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
```
Benchmark program
```
 val N = 2 << 20
 runBenchmark("left sort merge join", N) {
 val df1 = sparkSession.range(N)
 .selectExpr(s"(id * 15485863) % ${N*10} as k1")
 val df2 = sparkSession.range(N)
 .selectExpr(s"(id * 15485867) % ${N*10} as k2")
 val df = df1.join(df2, col("k1") === col("k2"), "left")
 assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
 df.count()
```
code example
```
val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), "left").collect
```
Generated code
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage5(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=5
/* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator
{
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator smj_leftInput;
/* 010 */ private scala.collection.Iterator smj_rightInput;
/* 011 */ private InternalRow smj_leftRow;
/* 012 */ private InternalRow smj_rightRow;
/* 013 */ private long smj_value2;
/* 014 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 015 */ private long smj_value3;
/* 016 */ private long smj_value4;
/* 017 */ private long smj_value5;
/* 018 */ private long smj_value6;
/* 019 */ private boolean smj_isNull2;
/* 020 */ private long smj_value7;
/* 021 */ private boolean smj_isNull3;
/* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] smj_mutableStateArray1
= new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
/* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray2
= new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
/* 025 */
/* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */ smj_leftInput = inputs[0];
/* 034 */ smj_rightInput = inputs[1];
/* 035 */
/* 036 */ smj_matches = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647,
2147483647);
/* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
/* 038 */ smj_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
0);
/* 039 */ smj_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
4);
/* 040 */
/* 041 */ }
/* 042 */
/* 043 */ private void writeJoinRows() throws java.io.IOException {
/* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
/* 045 */
/* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
/* 047 */
/* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
/* 049 */
/* 050 */ if (smj_isNull2) {
/* 051 */ smj_mutableStateArray2[0].setNullAt(2);
/* 052 */ } else {
/* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
/* 054 */ }
/* 055 */
/* 056 */ if (smj_isNull3) {
/* 057 */ smj_mutableStateArray2[0].setNullAt(3);
/* 058 */ } else {
/* 059 */ smj_mutableStateArray2[0].write(3, smj_value7);
/* 060 */ }
/* 061 */ append(smj_mutableStateArray[0].copy());
/* 062 */
/* 063 */ }
/* 064 */
/* 065 */ private boolean findNextJoinRows(
/* 066 */ scala.collection.Iterator leftIter,
/* 067 */ scala.collection.Iterator rightIter) {
/* 068 */ smj_leftRow = null;
/* 069 */ int comp = 0;
/* 070 */ while (smj_leftRow == null) {
/* 071 */ if (!leftIter.hasNext()) return false;
/* 072 */ smj_leftRow = (InternalRow) leftIter.next();
/* 073 */
/* 074 */ long smj_value = smj_leftRow.getLong(0);
/* 075 */ if (false) {
/* 076 */ if (!smj_matches.isEmpty()) {
/* 077 */ smj_matches.clear();
/* 078 */ }
/* 079 */ return true;
/* 080 */ }
/* 081 */ if (!smj_matches.isEmpty()) {
/* 082 */ comp = 0;
/* 083 */ if (comp == 0) {
/* 084 */ comp = (smj_value > smj_value3 ? 1 : smj_value < smj_value3 ? -1 : 0);
/* 085 */ }
/* 086 */
/* 087 */ if (comp == 0) {
/* 088 */ return true;
/* 089 */ }
/* 090 */ smj_matches.clear();
/* 091 */ }
/* 092 */
/* 093 */ do {
/* 094 */ if (smj_rightRow == null) {
/* 095 */ if (!rightIter.hasNext()) {
/* 096 */ smj_value3 = smj_value;
/* 097 */ return true;
/* 098 */ }
/* 099 */ smj_rightRow = (InternalRow) rightIter.next();
/* 100 */
/* 101 */ long smj_value1 = smj_rightRow.getLong(0);
/* 102 */ if (false) {
/* 103 */ smj_rightRow = null;
/* 104 */ continue;
/* 105 */ }
/* 106 */ smj_value2 = smj_value1;
/* 107 */ }
/* 108 */
/* 109 */ comp = 0;
/* 110 */ if (comp == 0) {
/* 111 */ comp = (smj_value > smj_value2 ? 1 : smj_value < smj_value2 ? -1 : 0);
/* 112 */ }
/* 113 */
/* 114 */ if (comp > 0) {
/* 115 */ smj_rightRow = null;
/* 116 */ } else if (comp < 0) {
/* 117 */ if (!smj_matches.isEmpty()) {
/* 118 */ smj_value3 = smj_value;
/* 119 */ }
/* 120 */ return true;
/* 121 */ } else {
/* 122 */ smj_matches.add((UnsafeRow) smj_rightRow);
/* 123 */ smj_rightRow = null;
/* 124 */ }
/* 125 */ } while (smj_leftRow != null);
/* 126 */ }
/* 127 */ return false; // unreachable
/* 128 */ }
/* 129 */
/* 130 */ protected void processNext() throws java.io.IOException {
/* 131 */ while (findNextJoinRows(smj_leftInput, smj_rightInput)) {
/* 132 */ boolean smj_loaded = false;
/* 133 */ smj_value4 = smj_leftRow.getLong(0);
/* 134 */ smj_value5 = smj_leftRow.getLong(1);
/* 135 */ scala.collection.Iterator<UnsafeRow> smj_iterator = smj_matches.generateIterator();
/* 136 */ while (smj_iterator.hasNext()) {
/* 137 */ InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next();
/* 138 */ smj_isNull3 = smj_rightRow1.isNullAt(1);
/* 139 */ smj_value7 = smj_rightRow1.getLong(1);
/* 140 */ boolean smj_isNull4 = true;
/* 141 */ boolean smj_value8 = false;
/* 142 */
/* 143 */ if (!smj_isNull3) {
/* 144 */ smj_isNull4 = false; // resultCode could change nullability.
/* 145 */ smj_value8 = smj_value5 < smj_value7;
/* 146 */
/* 147 */ }
/* 148 */ if (smj_isNull4 || !smj_value8) continue;
/* 149 */ smj_isNull2 = smj_rightRow1.isNullAt(0);
/* 150 */ smj_value6 = smj_rightRow1.getLong(0);
/* 151 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows
*/).add(1);
/* 152 */ smj_loaded = true;
/* 153 */ writeJoinRows();
/* 154 */ }
/* 155 */ if (!smj_loaded) {
/* 156 */ smj_isNull2 = true;
/* 157 */ smj_isNull3 = true;
/* 158 */ writeJoinRows();
/* 159 */ }
/* 160 */ if (shouldStop()) return;
/* 161 */ }
/* 162 */ }
/* 163 */
/* 164 */ }
```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message