spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davies <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...
Date Thu, 04 Feb 2016 21:47:33 GMT
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180065661
  
    For join with single Long key:
    ```
        val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id
as string) as v"))
        sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count()
    ```
    Will generate:
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator
{
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg;
    /* 010 */   private boolean agg_bufIsNull;
    /* 011 */   private long agg_bufValue;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
    /* 013 */   private org.apache.spark.sql.execution.joins.LongArrayRelation bhj_relation;
    /* 014 */   private boolean range_initRange;
    /* 015 */   private long range_partitionEnd;
    /* 016 */   private long range_number;
    /* 017 */   private boolean range_overflow;
    /* 018 */   private UnsafeRow agg_result;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter;
    /* 021 */
    /* 022 */   public GeneratedIterator(Object[] references) {
    /* 023 */     this.references = references;
    /* 024 */     agg_initAgg = false;
    /* 025 */
    /* 026 */
    /* 027 */     this.bhj_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
    /* 028 */
    /* 029 */     bhj_relation = (org.apache.spark.sql.execution.joins.LongArrayRelation)
bhj_broadcast.value();
    /* 030 */     incPeakExecutionMemory(bhj_relation.getMemorySize());
    /* 031 */
    /* 032 */     range_initRange = false;
    /* 033 */     range_partitionEnd = 0L;
    /* 034 */     range_number = 0L;
    /* 035 */     range_overflow = false;
    /* 036 */     agg_result = new UnsafeRow(1);
    /* 037 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
0);
    /* 038 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
1);
    /* 039 */   }
    /* 040 */
    /* 041 */
    /* 042 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
    /* 043 */     // initialize aggregation buffer
    /* 045 */     agg_bufIsNull = false;
    /* 046 */     agg_bufValue = 0L;
    /* 051 */     // initialize Range
    /* 052 */     if (!range_initRange) {
    /* 053 */       range_initRange = true;
    /* 054 */       if (input.hasNext()) {
    /* 055 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 056 */       } else {
    /* 057 */         return;
    /* 058 */       }
    /* 059 */     }
    /* 060 */
    /* 061 */     while (!range_overflow && range_number < range_partitionEnd)
{
    /* 062 */       long range_value = range_number;
    /* 063 */       range_number += 1L;
    /* 064 */       if (range_number < range_value ^ 1L < 0) {
    /* 065 */         range_overflow = true;
    /* 066 */       }
    /* 067 */
    /* 068 */       // generate join key
    /* 069 */       /* cast((input[0, bigint] % 60000) as bigint) */
    /* 070 */       /* (input[0, bigint] % 60000) */
    /* 071 */       boolean bhj_isNull1 = false;
    /* 072 */       long bhj_value1 = -1L;
    /* 073 */       if (false || 60000L == 0) {
    /* 074 */         bhj_isNull1 = true;
    /* 075 */       } else {
    /* 076 */
    /* 077 */         if (false) {
    /* 078 */           bhj_isNull1 = true;
    /* 079 */         } else {
    /* 080 */           bhj_value1 = (long)(range_value % 60000L);
    /* 081 */         }
    /* 082 */       }
    /* 083 */       boolean bhj_isNull = bhj_isNull1;
    /* 084 */       long bhj_value = -1L;
    /* 085 */       if (!bhj_isNull1) {
    /* 086 */         bhj_value = bhj_value1;
    /* 087 */       }
    /* 088 */       // find matches from HashedRelation
    /* 089 */       UnsafeRow bhj_matched = bhj_isNull ? null:(UnsafeRow)bhj_relation.getValue(bhj_value);
    /* 090 */       if (bhj_matched != null) {
    /* 091 */         /* input[0, bigint] */
    /* 092 */         long bhj_value4 = bhj_matched.getLong(0);
    /* 097 */         // do aggregate
    /* 098 */         /* (input[0, bigint] + 1) */
    /* 099 */         long agg_value1 = -1L;
    /* 100 */         agg_value1 = agg_bufValue + 1L;
    /* 101 */         // update aggregation buffer
    /* 102 */         agg_bufIsNull = false;
    /* 103 */         agg_bufValue = agg_value1;
    /* 106 */       }
    /* 109 */       if (shouldStop()) return;
    /* 110 */     }
    /* 113 */   }
    /* 116 */   private void initRange(int idx) {
    /* 141 */   }
    /* 142 */
    /* 143 */
    /* 144 */   protected void processNext() throws java.io.IOException {
    /* 145 */     if (!agg_initAgg) {
    /* 146 */       agg_initAgg = true;
    /* 147 */       agg_doAggregateWithoutKey();
    /* 148 */
    /* 149 */       // output the result
    /* 150 */
    /* 151 */
    /* 152 */       agg_rowWriter.zeroOutNullBytes();
    /* 153 */
    /* 154 */
    /* 155 */       if (agg_bufIsNull) {
    /* 156 */         agg_rowWriter.setNullAt(0);
    /* 157 */       } else {
    /* 158 */         agg_rowWriter.write(0, agg_bufValue);
    /* 159 */       }
    /* 160 */       currentRows.add(agg_result.copy());
    /* 161 */     }
    /* 162 */   }
    /* 163 */ }
    /* 164 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message