spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davies <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-9024] [WIP] Unsafe HashJoin
Date Mon, 20 Jul 2015 16:44:40 GMT
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7480#discussion_r35015733
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
---
    @@ -148,3 +148,88 @@ private[joins] object HashedRelation {
         }
       }
     }
    +
    +
    +/**
    + * A HashedRelation for UnsafeRow, which is backed by BytesToBytesMap that maps the key
into a
    + * sequence of values.
    + *
    + * TODO(davies): use BytesToBytesMap
    + */
    +private[joins] final class UnsafeHashedRelation(
    +    private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]],
    +    private var keyTypes: Array[DataType])
    +  extends HashedRelation with Externalizable {
    +
    +  def this() = this(null, null)  // Needed for serialization
    +
    +  // UnsafeProjection is not thread safe
    +  @transient lazy val keyProjection = new ThreadLocal[UnsafeProjection]
    +
    +  override def get(key: InternalRow): CompactBuffer[InternalRow] = {
    +    val unsafeKey = if (key.isInstanceOf[UnsafeRow]) {
    +      key.asInstanceOf[UnsafeRow]
    +    } else {
    +      var proj = keyProjection.get()
    +      if (proj eq null) {
    +        proj = UnsafeProjection.create(keyTypes)
    +        keyProjection.set(proj)
    +      }
    +      proj(key)
    +    }
    +    // reply on type erasure in Scala
    +    hashTable.get(unsafeKey).asInstanceOf[CompactBuffer[InternalRow]]
    +  }
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(keyTypes))
    +    val bytes = SparkSqlSerializer.serialize(hashTable)
    +    writeBytes(out, bytes)
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    keyTypes = SparkSqlSerializer.deserialize(readBytes(in))
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +private[joins] object UnsafeHashedRelation {
    +
    +  def apply(
    +    input: Iterator[InternalRow],
    +    buildKey: Seq[Expression],
    +    rowSchema: StructType,
    +    sizeEstimate: Int = 64): HashedRelation = {
    +
    +    // TODO: Use BytesToBytesMap.
    +    val hashTable = new JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]](sizeEstimate)
    --- End diff --
    
    I'm thinking we can re-order the values in BytesToBytesMap during serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message