From issues-return-187026-archive-asf-public=cust-asf.ponee.io@spark.apache.org Wed Mar 14 08:42:11 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 30021180654 for ; Wed, 14 Mar 2018 08:42:11 +0100 (CET) Received: (qmail 50945 invoked by uid 500); 14 Mar 2018 07:42:05 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 50936 invoked by uid 99); 14 Mar 2018 07:42:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Mar 2018 07:42:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 331C7C050B for ; Wed, 14 Mar 2018 07:42:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.511 X-Spam-Level: X-Spam-Status: No, score=-109.511 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id tAsLKa3XtYEm for ; Wed, 14 Mar 2018 07:42:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 7C44F5F216 for ; Wed, 14 Mar 2018 07:42:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 7DDC9E0142 for ; Wed, 14 Mar 2018 07:42:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 258072148D for ; Wed, 14 Mar 2018 07:42:00 +0000 (UTC) Date: Wed, 14 Mar 2018 07:42:00 +0000 (UTC) From: "caoxuewen (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (SPARK-23676) Support left join codegen in SortMergeJoinExec MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 caoxuewen created SPARK-23676: --------------------------------- Summary: Support left join codegen in SortMergeJoinExec Key: SPARK-23676 URL: https://issues.apache.org/jira/browse/SPARK-23676 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: caoxuewen This PR generates java code to directly complete the function of LeftOuter in `SortMergeJoinExec` without using an iterator. This PR improves runtime performance by this generates java code. joinBenchmark result: **1.3x** ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1 Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------ left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X ``` Benchmark program ``` val N = 2 << 20 runBenchmark("left sort merge join", N) { val df1 = sparkSession.range(N) .selectExpr(s"(id * 15485863) % ${N*10} as k1") val df2 = sparkSession.range(N) .selectExpr(s"(id * 15485867) % ${N*10} as k2") val df = df1.join(df2, col("k1") === col("k2"), "left") assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined) df.count() ``` code example ``` val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1") val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2") df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), "left").collect ``` Generated code ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage5(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=5 /* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator smj_leftInput; /* 010 */ private scala.collection.Iterator smj_rightInput; /* 011 */ private InternalRow smj_leftRow; /* 012 */ private InternalRow smj_rightRow; /* 013 */ private long smj_value2; /* 014 */ private org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches; /* 015 */ private long smj_value3; /* 016 */ private long smj_value4; /* 017 */ private long smj_value5; /* 018 */ private long smj_value6; /* 019 */ private boolean smj_isNull2; /* 020 */ private long smj_value7; /* 021 */ private boolean smj_isNull3; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] smj_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1]; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1]; /* 025 */ /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ smj_leftInput = inputs[0]; /* 034 */ smj_rightInput = inputs[1]; /* 035 */ /* 036 */ smj_matches = new org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 2147483647); /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4); /* 038 */ smj_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0], 0); /* 039 */ smj_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0], 4); /* 040 */ /* 041 */ } /* 042 */ /* 043 */ private void writeJoinRows() throws java.io.IOException { /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes(); /* 045 */ /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4); /* 047 */ /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5); /* 049 */ /* 050 */ if (smj_isNull2) { /* 051 */ smj_mutableStateArray2[0].setNullAt(2); /* 052 */ } else { /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6); /* 054 */ } /* 055 */ /* 056 */ if (smj_isNull3) { /* 057 */ smj_mutableStateArray2[0].setNullAt(3); /* 058 */ } else { /* 059 */ smj_mutableStateArray2[0].write(3, smj_value7); /* 060 */ } /* 061 */ append(smj_mutableStateArray[0].copy()); /* 062 */ /* 063 */ } /* 064 */ /* 065 */ private boolean findNextJoinRows( /* 066 */ scala.collection.Iterator leftIter, /* 067 */ scala.collection.Iterator rightIter) { /* 068 */ smj_leftRow = null; /* 069 */ int comp = 0; /* 070 */ while (smj_leftRow == null) { /* 071 */ if (!leftIter.hasNext()) return false; /* 072 */ smj_leftRow = (InternalRow) leftIter.next(); /* 073 */ /* 074 */ long smj_value = smj_leftRow.getLong(0); /* 075 */ if (false) { /* 076 */ if (!smj_matches.isEmpty()) { /* 077 */ smj_matches.clear(); /* 078 */ } /* 079 */ return true; /* 080 */ } /* 081 */ if (!smj_matches.isEmpty()) { /* 082 */ comp = 0; /* 083 */ if (comp == 0) { /* 084 */ comp = (smj_value > smj_value3 ? 1 : smj_value < smj_value3 ? -1 : 0); /* 085 */ } /* 086 */ /* 087 */ if (comp == 0) { /* 088 */ return true; /* 089 */ } /* 090 */ smj_matches.clear(); /* 091 */ } /* 092 */ /* 093 */ do { /* 094 */ if (smj_rightRow == null) { /* 095 */ if (!rightIter.hasNext()) { /* 096 */ smj_value3 = smj_value; /* 097 */ return true; /* 098 */ } /* 099 */ smj_rightRow = (InternalRow) rightIter.next(); /* 100 */ /* 101 */ long smj_value1 = smj_rightRow.getLong(0); /* 102 */ if (false) { /* 103 */ smj_rightRow = null; /* 104 */ continue; /* 105 */ } /* 106 */ smj_value2 = smj_value1; /* 107 */ } /* 108 */ /* 109 */ comp = 0; /* 110 */ if (comp == 0) { /* 111 */ comp = (smj_value > smj_value2 ? 1 : smj_value < smj_value2 ? -1 : 0); /* 112 */ } /* 113 */ /* 114 */ if (comp > 0) { /* 115 */ smj_rightRow = null; /* 116 */ } else if (comp < 0) { /* 117 */ if (!smj_matches.isEmpty()) { /* 118 */ smj_value3 = smj_value; /* 119 */ } /* 120 */ return true; /* 121 */ } else { /* 122 */ smj_matches.add((UnsafeRow) smj_rightRow); /* 123 */ smj_rightRow = null; /* 124 */ } /* 125 */ } while (smj_leftRow != null); /* 126 */ } /* 127 */ return false; // unreachable /* 128 */ } /* 129 */ /* 130 */ protected void processNext() throws java.io.IOException { /* 131 */ while (findNextJoinRows(smj_leftInput, smj_rightInput)) { /* 132 */ boolean smj_loaded = false; /* 133 */ smj_value4 = smj_leftRow.getLong(0); /* 134 */ smj_value5 = smj_leftRow.getLong(1); /* 135 */ scala.collection.Iterator smj_iterator = smj_matches.generateIterator(); /* 136 */ while (smj_iterator.hasNext()) { /* 137 */ InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next(); /* 138 */ smj_isNull3 = smj_rightRow1.isNullAt(1); /* 139 */ smj_value7 = smj_rightRow1.getLong(1); /* 140 */ boolean smj_isNull4 = true; /* 141 */ boolean smj_value8 = false; /* 142 */ /* 143 */ if (!smj_isNull3) { /* 144 */ smj_isNull4 = false; // resultCode could change nullability. /* 145 */ smj_value8 = smj_value5 < smj_value7; /* 146 */ /* 147 */ } /* 148 */ if (smj_isNull4 || !smj_value8) continue; /* 149 */ smj_isNull2 = smj_rightRow1.isNullAt(0); /* 150 */ smj_value6 = smj_rightRow1.getLong(0); /* 151 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 152 */ smj_loaded = true; /* 153 */ writeJoinRows(); /* 154 */ } /* 155 */ if (!smj_loaded) { /* 156 */ smj_isNull2 = true; /* 157 */ smj_isNull3 = true; /* 158 */ writeJoinRows(); /* 159 */ } /* 160 */ if (shouldStop()) return; /* 161 */ } /* 162 */ } /* 163 */ /* 164 */ } ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org