spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tejasapatil <...@git.apache.org>
Subject [GitHub] spark pull request #15047: [SPARK-17495] [SQL] Add Hash capability semantica...
Date Wed, 28 Sep 2016 07:05:31 GMT
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15047#discussion_r80848979
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
---
    @@ -559,3 +607,219 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable
{
       override def foldable: Boolean = true
       override def nullable: Boolean = false
     }
    +
    +/**
    + * Simulates Hive's hashing function at
    + * org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode() in Hive
    + *
    + * We should use this hash function for both shuffle and bucket of Hive tables, so that
    + * we can guarantee shuffle and bucketing have same data distribution
    + *
    + * TODO: Support Decimal and date related types
    + */
    +@ExpressionDescription(
    +  usage = "_FUNC_(a1, a2, ...) - Returns a hash value of the arguments.")
    +case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
    +  override val seed = 0
    +
    +  override def dataType: DataType = IntegerType
    +
    +  override def prettyName: String = "hive-hash"
    +
    +  override protected def hasherClassName: String = classOf[HiveHasher].getName
    +
    +  override protected def computeHash(value: Any, dataType: DataType, seed: Int): Int
= {
    +    HiveHashFunction.hash(value, dataType, seed).toInt
    +  }
    +
    +  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    ev.isNull = "false"
    +    val childHash = ctx.freshName("childHash")
    +    val childrenHash = children.map { child =>
    +      val childGen = child.genCode(ctx)
    +      childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
    +        computeHash(childGen.value, child.dataType, childHash, ctx)
    +      } + s"${ev.value} = (31 * ${ev.value}) + $childHash;"
    +    }.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "")
    +
    +    ev.copy(code = s"""
    +      ${ctx.javaType(dataType)} ${ev.value} = $seed;
    +      $childrenHash""")
    +  }
    +
    +  @tailrec
    +  private def computeHash(
    +      input: String,
    +      dataType: DataType,
    +      result: String,
    +      ctx: CodegenContext): String = {
    +    val hasher = hasherClassName
    +
    +    dataType match {
    +      case NullType => ""
    +      case BooleanType => genHashBoolean(input, hasher, result)
    +      case ByteType | ShortType | IntegerType | DateType => genHashInt(input, hasher,
result)
    +      case LongType | TimestampType => genHashLong(input, hasher, result)
    +      case FloatType => genHashFloat(input, hasher, result)
    +      case DoubleType => genHashDouble(input, hasher, result)
    +      case d: DecimalType => genHashDecimal(ctx, d, input, hasher, result)
    +      case CalendarIntervalType => genHashCalendarInterval(input, hasher, result)
    +      case BinaryType => genHashBytes(input, hasher, result)
    +      case StringType => genHashString(input, hasher, result)
    +      case ArrayType(et, containsNull) => genHashForArray(ctx, input, result, et,
containsNull)
    +      case MapType(kt, vt, valueContainsNull) =>
    +        genHashForMap(ctx, input, result, kt, vt, valueContainsNull)
    +      case StructType(fields) => genHashForStruct(ctx, input, result, fields)
    +      case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx)
    +    }
    +  }
    +
    +  override def eval(input: InternalRow): Int = {
    +    var hash = seed
    +    var i = 0
    +    val len = children.length
    +    while (i < len) {
    +      hash = (31 * hash) + computeHash(children(i).eval(input), children(i).dataType,
hash)
    +      i += 1
    +    }
    +    hash
    +  }
    +
    +  override protected def genHashInt(i: String, hasher: String, result: String): String
=
    +    s"$result = $hasher.hashInt($i, 0);"
    +
    +  override protected def genHashLong(l: String, hasher: String, result: String): String
=
    +    s"$result = $hasher.hashLong($l, 0);"
    +
    +  override protected def genHashBytes(b: String, hasher: String, result: String): String
=
    +    s"$result = $hasher.hashUnsafeBytes($b, Platform.BYTE_ARRAY_OFFSET, $b.length, 0);"
    +
    +  override protected def genHashForArray(
    +      ctx: CodegenContext,
    +      input: String,
    +      result: String,
    +      elementType: DataType,
    +      containsNull: Boolean): String = {
    +    val index = ctx.freshName("index")
    +    val childResult = ctx.freshName("childResult")
    +    s"""
    +        int $childResult = 0;
    +        for (int $index = 0; $index < $input.numElements(); $index++) {
    +          $childResult = 0;
    +          ${nullSafeElementHash(input, index, containsNull, elementType, childResult,
ctx)};
    +          $result = (31 * $result) + $childResult;
    +        }
    +      """
    +  }
    +
    +  override protected def genHashForMap(
    +      ctx: CodegenContext,
    +      input: String,
    +      result: String,
    +      keyType: DataType,
    +      valueType: DataType,
    +      valueContainsNull: Boolean): String = {
    +    val index = ctx.freshName("index")
    +    val keys = ctx.freshName("keys")
    +    val values = ctx.freshName("values")
    +    val keyResult = ctx.freshName("keyResult")
    +    val valueResult = ctx.freshName("valueResult")
    +    s"""
    +        final ArrayData $keys = $input.keyArray();
    +        final ArrayData $values = $input.valueArray();
    +        int $keyResult = 0;
    +        int $valueResult = 0;
    +        for (int $index = 0; $index < $input.numElements(); $index++) {
    +          $keyResult = 0;
    +          ${nullSafeElementHash(keys, index, false, keyType, keyResult, ctx)}
    +          $valueResult = 0;
    +          ${nullSafeElementHash(values, index, valueContainsNull, valueType, valueResult,
ctx)}
    +          $result += $keyResult ^ $valueResult;
    +        }
    +      """
    +  }
    +
    +  override protected def genHashForStruct(
    +      ctx: CodegenContext,
    +      input: String,
    +      result: String,
    +      fields: Array[StructField]): String = {
    +    val localResult = ctx.freshName("localResult")
    +    val childResult = ctx.freshName("childResult")
    +    fields.zipWithIndex.map { case (field, index) =>
    +      s"""
    +         $childResult = 0;
    +         ${nullSafeElementHash(input, index.toString, field.nullable, field.dataType,
    +           childResult, ctx)}
    +         $localResult = (31 * $localResult) + $childResult;
    +       """
    +    }.mkString(
    +      s"""
    +         int $localResult = 0;
    +         int $childResult = 0;
    +       """,
    +      "",
    +      s"$result = (31 * $result) + $localResult;"
    +    )
    +  }
    +}
    +
    +object HiveHashFunction extends InterpretedHashFunction {
    +  override protected def hashInt(i: Int, seed: Long): Long = {
    +    HiveHasher.hashInt(i, seed)
    +  }
    +
    +  override protected def hashLong(l: Long, seed: Long): Long = {
    +    HiveHasher.hashLong(l, seed)
    +  }
    +
    +  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed:
Long): Long = {
    +    HiveHasher.hashUnsafeBytes(base, offset, len, seed)
    +  }
    +
    +  override def hash(value: Any, dataType: DataType, seed: Long): Long = {
    +    value match {
    +      case null => 0
    +      case array: ArrayData =>
    +        val elementType = dataType match {
    +          case udt: UserDefinedType[_] => udt.sqlType.asInstanceOf[ArrayType].elementType
    +          case ArrayType(et, _) => et
    +        }
    +        var result: Int = 0
    +        for (i <- 0 until array.numElements()) {
    --- End diff --
    
    changed this everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message