spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davies <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-10585][SQL] only copy data once when ge...
Date Thu, 01 Oct 2015 20:33:29 GMT
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8747#discussion_r40962693
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
---
    @@ -393,10 +393,278 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression],
UnsafePro
         case _ => input
       }
     
    +  val rowWriterClass = classOf[UnsafeRowWriter].getName
    +  val arrayWriterClass = classOf[UnsafeArrayWriter].getName
    +
    +  // todo: if the nullability of field is correct, we can use it to save null check.
    +  private def writeStructToBuffer(
    +      ctx: CodeGenContext,
    +      input: String,
    +      fieldTypes: Seq[DataType],
    +      bufferHolder: String): String = {
    +    val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) =>
    +      val fieldName = ctx.freshName("fieldName")
    +      val code = s"final ${ctx.javaType(dt)} $fieldName = ${ctx.getValue(input, dt, i.toString)};"
    +      val isNull = s"$input.isNullAt($i)"
    +      GeneratedExpressionCode(code, isNull, fieldName)
    +    }
    +
    +    s"""
    +      if ($input instanceof UnsafeRow) {
    +        $rowWriterClass.directWrite($bufferHolder, (UnsafeRow) $input);
    +      } else {
    +        ${writeExpressionsToBuffer(ctx, input, fieldEvals, fieldTypes, bufferHolder)}
    +      }
    +    """
    +  }
    +
    +  private def writeExpressionsToBuffer(
    +      ctx: CodeGenContext,
    +      row: String,
    +      inputs: Seq[GeneratedExpressionCode],
    +      inputTypes: Seq[DataType],
    +      bufferHolder: String): String = {
    +    val rowWriter = ctx.freshName("rowWriter")
    +    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +
    +    val writeFields = inputs.zip(inputTypes).zipWithIndex.map {
    +      case ((input, dt), index) =>
    +        val tmpCursor = ctx.freshName("tmpCursor")
    +
    +        val setNull = dt match {
    +          case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
    +            // Can't call setNullAt() for DecimalType with precision larger than 18.
    +            s"$rowWriter.write($index, null, 0, 0);"
    +          case _ => s"$rowWriter.setNullAt($index);"
    +        }
    +
    +        val writeField = dt match {
    +          case t: StructType =>
    +            s"""
    +              // Remember the current cursor so that we can calculate how many bytes
are
    +              // written later.
    +              final int $tmpCursor = $bufferHolder.cursor;
    +              ${writeStructToBuffer(ctx, input.primitive, t.map(_.dataType), bufferHolder)}
    +              $rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor -
$tmpCursor);
    +            """
    +
    +          case a @ ArrayType(et, _) =>
    +            s"""
    +              // Remember the current cursor so that we can calculate how many bytes
are
    +              // written later.
    +              final int $tmpCursor = $bufferHolder.cursor;
    +              ${writeArrayToBuffer(ctx, input.primitive, et, bufferHolder)}
    +              $rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor -
$tmpCursor);
    +              $rowWriter.alignWords($bufferHolder.cursor - $tmpCursor);
    +            """
    +
    +          case m @ MapType(kt, vt, _) =>
    +            s"""
    +              // Remember the current cursor so that we can calculate how many bytes
are
    +              // written later.
    +              final int $tmpCursor = $bufferHolder.cursor;
    +              ${writeMapToBuffer(ctx, input.primitive, kt, vt, bufferHolder)}
    +              $rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor -
$tmpCursor);
    +              $rowWriter.alignWords($bufferHolder.cursor - $tmpCursor);
    +            """
    +
    +          case _ if ctx.isPrimitiveType(dt) =>
    +            val fieldOffset = ctx.freshName("fieldOffset")
    +            s"""
    +              final long $fieldOffset = $rowWriter.getFieldOffset($index);
    +              Platform.putLong($bufferHolder.buffer, $fieldOffset, 0L);
    +              ${writePrimitiveType(ctx, input.primitive, dt, s"$bufferHolder.buffer",
fieldOffset)}
    +            """
    +
    +          case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
    +            s"$rowWriter.writeCompactDecimal($index, ${input.primitive}, " +
    +              s"${t.precision}, ${t.scale});"
    +
    +          case t: DecimalType =>
    +            s"$rowWriter.write($index, ${input.primitive}, ${t.precision}, ${t.scale});"
    +
    +          case NullType => ""
    +
    +          case _ => s"$rowWriter.write($index, ${input.primitive});"
    +        }
    +
    +        s"""
    +          ${input.code}
    +          if (${input.isNull}) {
    +            $setNull
    +          } else {
    +            $writeField
    +          }
    +        """
    +    }
    +
    +    s"""
    +      $rowWriter.initialize($bufferHolder, ${inputs.length});
    +      ${ctx.splitExpressions(row, writeFields)}
    +    """
    +  }
    +
    +  // todo: if the nullability of array element is correct, we can use it to save null
check.
    +  private def writeArrayToBuffer(
    +      ctx: CodeGenContext,
    +      input: String,
    +      elementType: DataType,
    +      bufferHolder: String,
    +      needHeader: Boolean = true): String = {
    +    val arrayWriter = ctx.freshName("arrayWriter")
    +    ctx.addMutableState(arrayWriterClass, arrayWriter,
    +      s"this.$arrayWriter = new $arrayWriterClass();")
    +    val numElements = ctx.freshName("numElements")
    +    val index = ctx.freshName("index")
    +    val element = ctx.freshName("element")
    +
    +    val jt = ctx.javaType(elementType)
    +
    +    val fixedElementSize = elementType match {
    +      case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
    +      case _ if ctx.isPrimitiveType(jt) => elementType.defaultSize
    +      case _ => 0
    +    }
    +
    +    val writeElement = elementType match {
    +      case t: StructType =>
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writeStructToBuffer(ctx, element, t.map(_.dataType), bufferHolder)}
    +        """
    +
    +      case a @ ArrayType(et, _) =>
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writeArrayToBuffer(ctx, element, et, bufferHolder)}
    +        """
    +
    +      case m @ MapType(kt, vt, _) =>
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)}
    +        """
    +
    +      case _ if ctx.isPrimitiveType(elementType) =>
    +        // Should we do word align?
    +        val dataSize = elementType.defaultSize
    +
    +        s"""
    +          $arrayWriter.setOffset($index);
    +          ${writePrimitiveType(ctx, element, elementType,
    +            s"$bufferHolder.buffer", s"$bufferHolder.cursor")}
    +          $bufferHolder.cursor += $dataSize;
    +        """
    +
    +      case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
    +        s"$arrayWriter.writeCompactDecimal($index, $element, ${t.precision}, ${t.scale});"
    +
    +      case t: DecimalType =>
    +        s"$arrayWriter.write($index, $element, ${t.precision}, ${t.scale});"
    +
    +      case NullType => ""
    +
    +      case _ => s"$arrayWriter.write($index, $element);"
    +    }
    +
    +    s"""
    +      if ($input instanceof UnsafeArrayData) {
    +        $arrayWriterClass.directWrite($bufferHolder, (UnsafeArrayData) $input, $needHeader);
    +      } else {
    +        final int $numElements = $input.numElements();
    +        $arrayWriter.initialize($bufferHolder, $numElements, $needHeader, $fixedElementSize);
    +
    +        for (int $index = 0; $index < $numElements; $index++) {
    +          if ($input.isNullAt($index)) {
    +            $arrayWriter.setNullAt($index);
    +          } else {
    +            final $jt $element = ${ctx.getValue(input, elementType, index)};
    +            $writeElement
    +          }
    +        }
    +      }
    +    """
    +  }
    +
    +  // todo: if the nullability of value element is correct, we can use it to save null
check.
    +  private def writeMapToBuffer(
    +      ctx: CodeGenContext,
    +      input: String,
    +      keyType: DataType,
    +      valueType: DataType,
    +      bufferHolder: String): String = {
    +    val keys = ctx.freshName("keys")
    +    val values = ctx.freshName("values")
    +    val tmpCursor = ctx.freshName("tmpCursor")
    +
    +    s"""
    +      final ArrayData $keys = $input.keyArray();
    +      final ArrayData $values = $input.valueArray();
    +
    +      // Write the numElements into first 4 bytes.
    +      Platform.putInt($bufferHolder.buffer, $bufferHolder.cursor, $keys.numElements());
    +
    +      $bufferHolder.cursor += 8;
    +      // Remember the current cursor so that we can write numBytes of key array later.
    +      final int $tmpCursor = $bufferHolder.cursor;
    +
    +      ${writeArrayToBuffer(ctx, keys, keyType, bufferHolder, needHeader = false)}
    +      // Write the numBytes of key array into second 4 bytes.
    +      Platform.putInt($bufferHolder.buffer, $tmpCursor - 4, $bufferHolder.cursor - $tmpCursor);
    --- End diff --
    
    Never mind, I was wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message