spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davies <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...
Date Tue, 02 Feb 2016 20:09:52 GMT
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178795082
  
    ```
     val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as
string) as v"))
    sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
    
    ```
    
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator
{
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg0;
    /* 010 */   private boolean agg_bufIsNull1;
    /* 011 */   private long agg_bufValue2;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast broadcasthashjoin_broadcast6;
    /* 013 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation broadcasthashjoin_relation7;
    /* 014 */   private boolean range_initRange8;
    /* 015 */   private long range_partitionEnd9;
    /* 016 */   private long range_number10;
    /* 017 */   private boolean range_overflow11;
    /* 018 */   private UnsafeRow broadcasthashjoin_result19;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder broadcasthashjoin_holder20;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
broadcasthashjoin_rowWriter21;
    /* 021 */   private UnsafeRow agg_result37;
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder38;
    /* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter39;
    /* 024 */
    /* 025 */   private void initRange(int idx) {
    /* 050 */   }
    /* 051 */
    /* 052 */
    /* 053 */   private void agg_doAggregateWithoutKey5() throws java.io.IOException {
    /* 054 */     // initialize aggregation buffer
    
    /* 057 */     agg_bufIsNull1 = false;
    /* 058 */     agg_bufValue2 = 0L;
    
    /* 063 */     // initialize Range
    /* 064 */     if (!range_initRange8) {
    /* 065 */       range_initRange8 = true;
    /* 066 */       if (input.hasNext()) {
    /* 067 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 068 */       } else {
    /* 069 */         return;
    /* 070 */       }
    /* 071 */     }
    /* 072 */
    /* 073 */     while (!range_overflow11 && range_number10 < range_partitionEnd9)
{
    /* 074 */       long range_value12 = range_number10;
    /* 075 */       range_number10 += 1L;
    /* 076 */       if (range_number10 < range_value12 ^ 1L < 0) {
    /* 077 */         range_overflow11 = true;
    /* 078 */       }
    /* 079 */
    /* 080 */       // generate join key
    /* 084 */       broadcasthashjoin_rowWriter21.zeroOutNullBytes();
    /* 089 */       boolean broadcasthashjoin_isNull13 = false;
    /* 090 */       long broadcasthashjoin_value14 = -1L;
    /* 091 */       if (false || 60000L == 0) {
    /* 092 */         broadcasthashjoin_isNull13 = true;
    /* 093 */       } else {
    /* 094 */         /* input[0, bigint] */
    /* 095 */
    /* 096 */         if (false) {
    /* 097 */           broadcasthashjoin_isNull13 = true;
    /* 098 */         } else {
    /* 099 */           broadcasthashjoin_value14 = (long)(range_value12 % 60000L);
    /* 100 */         }
    /* 101 */       }
    /* 102 */       if (broadcasthashjoin_isNull13) {
    /* 103 */         broadcasthashjoin_rowWriter21.setNullAt(0);
    /* 104 */       } else {
    /* 105 */         broadcasthashjoin_rowWriter21.write(0, broadcasthashjoin_value14);
    /* 106 */       }
    /* 107 */
    /* 108 */
    /* 109 */       // find matches from HashRelation
    /* 110 */       org.apache.spark.util.collection.CompactBuffer broadcasthashjoin_matches23
= broadcasthashjoin_result19.anyNull() ? null : (org.apache.spark.util.collection.CompactBuffer)
broadcasthashjoin_relation7.get(broadcasthashjoin_result19);
    /* 111 */       if (broadcasthashjoin_matches23 != null) {
    /* 112 */         int broadcasthashjoin_size25 = broadcasthashjoin_matches23.size();
    /* 113 */         for (int broadcasthashjoin_i24 = 0; broadcasthashjoin_i24 < broadcasthashjoin_size25;
broadcasthashjoin_i24++) {
    /* 114 */           UnsafeRow broadcasthashjoin_row26 = (UnsafeRow) broadcasthashjoin_matches23.apply(broadcasthashjoin_i24);
    /* 115 */           /* input[0, bigint] */
    /* 116 */           long broadcasthashjoin_value28 = broadcasthashjoin_row26.getLong(0);
    /* 121 */           // do aggregate
    /* 127 */           long agg_value30 = -1L;
    /* 128 */           agg_value30 = agg_bufValue2 + 1L;
    /* 129 */           // update aggregation buffer
    /* 130 */
    /* 131 */           agg_bufIsNull1 = false;
    /* 132 */           agg_bufValue2 = agg_value30;
    /* 136 */         }
    /* 137 */       }
    /* 138 */
    /* 139 */
    /* 140 */       if (shouldStop()) return;
    /* 141 */     }
    /* 144 */   }
    /* 145 */
    /* 146 */
    /* 147 */   public GeneratedIterator(Object[] references) {
    /* 148 */     this.references = references;
    /* 149 */     agg_initAgg0 = false;
    /* 150 */
    /* 151 */
    /* 152 */     this.broadcasthashjoin_broadcast6 = (org.apache.spark.broadcast.TorrentBroadcast)
references[0];
    /* 153 */
    /* 154 */     broadcasthashjoin_relation7 = (org.apache.spark.sql.execution.joins.UnsafeHashedRelation)
broadcasthashjoin_broadcast6.value();
    /* 155 */     incPeakExecutionMemory(broadcasthashjoin_relation7.getUnsafeSize());
    /* 156 */
    /* 157 */     range_initRange8 = false;
    /* 158 */     range_partitionEnd9 = 0L;
    /* 159 */     range_number10 = 0L;
    /* 160 */     range_overflow11 = false;
    /* 161 */     broadcasthashjoin_result19 = new UnsafeRow(1);
    /* 162 */     this.broadcasthashjoin_holder20 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(broadcasthashjoin_result19,
0);
    /* 163 */     this.broadcasthashjoin_rowWriter21 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(broadcasthashjoin_holder20,
1);
    /* 164 */     agg_result37 = new UnsafeRow(1);
    /* 165 */     this.agg_holder38 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result37,
0);
    /* 166 */     this.agg_rowWriter39 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder38,
1);
    /* 167 */   }
    /* 168 */
    /* 169 */   protected void processNext() throws java.io.IOException {
    /* 170 */
    /* 171 */     if (!agg_initAgg0) {
    /* 172 */       agg_initAgg0 = true;
    /* 173 */       agg_doAggregateWithoutKey5();
    /* 174 */
    /* 175 */       // output the result
    /* 179 */       agg_rowWriter39.zeroOutNullBytes();
    /* 183 */       if (agg_bufIsNull1) {
    /* 184 */         agg_rowWriter39.setNullAt(0);
    /* 185 */       } else {
    /* 186 */         agg_rowWriter39.write(0, agg_bufValue2);
    /* 187 */       }
    /* 188 */       currentRows.add(agg_result37.copy());
    /* 190 */     }
    /* 192 */   }
    /* 193 */ }
    /* 194 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message