spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] kiszk commented on a change in pull request #25728: [SPARK-29020][WIP][SQL] Improving array_sort behaviour
Date Fri, 25 Oct 2019 08:14:37 GMT
kiszk commented on a change in pull request #25728: [SPARK-29020][WIP][SQL] Improving array_sort
behaviour
URL: https://github.com/apache/spark/pull/25728#discussion_r338934735
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ##########
 @@ -285,6 +287,104 @@ case class ArrayTransform(
   override def prettyName: String = "transform"
 }
 
+/**
+ * Sorts elements in an array using a comparator function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Sorts the input array in ascending order. The elements of
the " +
+    "input array must be orderable. Null elements will be placed at the end of the returned
" +
+    "array. Since 3.0.0 also sorts and returns the array based on the given " +
+    "comparator function. The comparator will take two nullable arguments " +
+    "representing two nullable elements of the array." +
+    "It returns -1, 0, or 1 as the first nullable element is less than, equal to, or greater
" +
+    "than the second nullable element. If the comparator function returns other " +
+    "values (including NULL), the query will fail and raise an error.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(5, 6, 1), (x, y) -> f(x, y));
+       [1,5,6]
+      > SELECT _FUNC_(array('bc', 'ab', 'dc'), (x, y) -> f(x, y));
+       ["dc", "bc", "ab"]
+      > SELECT _FUNC_(array('b', 'd', null, 'c', 'a'));
+       ["d", "c", "b", "a", null]
+  """,
+  since = "2.4.0")
+case class ArraySort(
+    argument: Expression,
+    function: Expression)
+  extends ArrayBasedSimpleHigherOrderFunction with ArraySortLike with CodegenFallback {
+
+  def this(argument: Expression) = this(argument, Literal(true))
+
+  @transient lazy val argumentsType: DataType =
+    argument.dataType.asInstanceOf[ArrayType].elementType
+
+  override protected def arrayExpression: Expression = argument
+
+  override protected def nullOrder: NullOrder = NullOrder.Greatest
+
+  override def dataType: ArrayType = argument.dataType.asInstanceOf[ArrayType]
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+
+    if (function.dataType == BooleanType) {
+      argument.dataType match {
+        case ArrayType(dt, _) if RowOrdering.isOrderable(dt) =>
+          TypeCheckResult.TypeCheckSuccess
+        case ArrayType(dt, _) =>
+          val dtSimple = dt.catalogString
+          TypeCheckResult.TypeCheckFailure(
+            s"$prettyName does not support sorting array of type $dtSimple which is not orderable")
+        case _ =>
+          TypeCheckResult.TypeCheckFailure(s"$prettyName only supports array input.")
+      }
+    } else {
+      checkArgumentDataTypes() match {
+        case TypeCheckResult.TypeCheckSuccess =>
+          val LambdaFunction(_, arguments, _) = function
+          if (arguments.size == 2 && function.dataType == IntegerType) {
+            TypeCheckResult.TypeCheckSuccess
+          } else {
+            TypeCheckResult.TypeCheckFailure("Return type of the given function has to be
" +
+              "IntegerType")
+          }
+
+        case failure => failure
+      }
+    }
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraySort
= {
+    val ArrayType(elementType, containsNull) = argument.dataType
+        copy(function =
+          f(function, (elementType, containsNull) :: (elementType, containsNull) :: Nil))
+  }
+
+  @transient lazy val (firstParam, secondParam) = {
+    val LambdaFunction(_, (firstParam: NamedLambdaVariable) +: tail, _) = function
+    val secondParam = tail.head.asInstanceOf[NamedLambdaVariable]
+    (firstParam, secondParam)
+  }
+
+  def comparator(inputRow: InternalRow): Comparator[Any] = {
+    val f = functionForEval
+    (o1: Any, o2: Any) => {
+        firstParam.value.set(o2)
+        secondParam.value.set(o1)
+        f.eval(inputRow).asInstanceOf[Int]
+    }
+  }
+
+  override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = {
+    val arr = argumentValue.asInstanceOf[ArrayData]
+    sortEval(arr, if (function.dataType == BooleanType) lt else comparator(inputRow))
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message