spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in SMJ
Date Wed, 05 Aug 2015 23:34:29 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 30e9fcfb3 -> 618dc63e7


[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in SMJ

This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use
the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen.

This is an updated version of #7408.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits:

e610655 [Josh Rosen] Add comment RE: Ascending ordering
34b8e0c [Josh Rosen] Import ordering
be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering
in more places.

(cherry picked from commit 9c878923db6634effed98c99bf24dd263bb7c6ad)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/618dc63e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/618dc63e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/618dc63e

Branch: refs/heads/branch-1.5
Commit: 618dc63e72ee69ede127004bb3b9c02b92ff703c
Parents: 30e9fcf
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Wed Aug 5 16:33:42 2015 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Wed Aug 5 16:34:02 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/RowOrdering.scala  | 93 -------------------
 .../sql/catalyst/expressions/arithmetic.scala   |  4 +-
 .../sql/catalyst/expressions/conditionals.scala |  4 +-
 .../sql/catalyst/expressions/ordering.scala     | 96 ++++++++++++++++++++
 .../sql/catalyst/expressions/predicates.scala   |  8 +-
 .../spark/sql/catalyst/util/TypeUtils.scala     |  4 +-
 .../org/apache/spark/sql/types/StructType.scala |  4 +-
 .../expressions/CodeGenerationSuite.scala       |  2 +-
 .../apache/spark/sql/execution/Exchange.scala   |  5 +-
 .../apache/spark/sql/execution/SparkPlan.scala  | 14 ++-
 .../spark/sql/execution/basicOperators.scala    |  4 +-
 .../sql/execution/joins/SortMergeJoin.scala     |  9 +-
 .../execution/UnsafeKVExternalSorterSuite.scala |  6 +-
 13 files changed, 136 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
deleted file mode 100644
index 873f532..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types._
-
-
-/**
- * An interpreted row ordering comparator.
- */
-class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
-
-  def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
-    this(ordering.map(BindReferences.bindReference(_, inputSchema)))
-
-  def compare(a: InternalRow, b: InternalRow): Int = {
-    var i = 0
-    while (i < ordering.size) {
-      val order = ordering(i)
-      val left = order.child.eval(a)
-      val right = order.child.eval(b)
-
-      if (left == null && right == null) {
-        // Both null, continue looking.
-      } else if (left == null) {
-        return if (order.direction == Ascending) -1 else 1
-      } else if (right == null) {
-        return if (order.direction == Ascending) 1 else -1
-      } else {
-        val comparison = order.dataType match {
-          case dt: AtomicType if order.direction == Ascending =>
-            dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
-          case dt: AtomicType if order.direction == Descending =>
-            dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
-          case s: StructType if order.direction == Ascending =>
-            s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
-          case s: StructType if order.direction == Descending =>
-            s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
-          case other =>
-            throw new IllegalArgumentException(s"Type $other does not support ordered operations")
-        }
-        if (comparison != 0) {
-          return comparison
-        }
-      }
-      i += 1
-    }
-    return 0
-  }
-}
-
-object RowOrdering {
-
-  /**
-   * Returns true iff the data type can be ordered (i.e. can be sorted).
-   */
-  def isOrderable(dataType: DataType): Boolean = dataType match {
-    case NullType => true
-    case dt: AtomicType => true
-    case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType))
-    case _ => false
-  }
-
-  /**
-   * Returns true iff outputs from the expressions can be ordered.
-   */
-  def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
-
-  /**
-   * Creates a [[RowOrdering]] for the given schema, in natural ascending order.
-   */
-  def forSchema(dataTypes: Seq[DataType]): RowOrdering = {
-    new RowOrdering(dataTypes.zipWithIndex.map {
-      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 5808e3f..98464ed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -320,7 +320,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic
{
 
   override def nullable: Boolean = left.nullable && right.nullable
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def eval(input: InternalRow): Any = {
     val input1 = left.eval(input)
@@ -374,7 +374,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic
{
 
   override def nullable: Boolean = left.nullable && right.nullable
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def eval(input: InternalRow): Any = {
     val input1 = left.eval(input)

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
index 961b1d8..d51f3d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
@@ -319,7 +319,7 @@ case class Least(children: Seq[Expression]) extends Expression {
   override def nullable: Boolean = children.forall(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def checkInputDataTypes(): TypeCheckResult = {
     if (children.length <= 1) {
@@ -374,7 +374,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {
   override def nullable: Boolean = children.forall(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)
 
-  private lazy val ordering = TypeUtils.getOrdering(dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
 
   override def checkInputDataTypes(): TypeCheckResult = {
     if (children.length <= 1) {

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
new file mode 100644
index 0000000..6407c73
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted row ordering comparator.
+ */
+class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+
+  def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
+    this(ordering.map(BindReferences.bindReference(_, inputSchema)))
+
+  def compare(a: InternalRow, b: InternalRow): Int = {
+    var i = 0
+    while (i < ordering.size) {
+      val order = ordering(i)
+      val left = order.child.eval(a)
+      val right = order.child.eval(b)
+
+      if (left == null && right == null) {
+        // Both null, continue looking.
+      } else if (left == null) {
+        return if (order.direction == Ascending) -1 else 1
+      } else if (right == null) {
+        return if (order.direction == Ascending) 1 else -1
+      } else {
+        val comparison = order.dataType match {
+          case dt: AtomicType if order.direction == Ascending =>
+            dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
+          case dt: AtomicType if order.direction == Descending =>
+            dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
+          case s: StructType if order.direction == Ascending =>
+            s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
+          case s: StructType if order.direction == Descending =>
+            s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
+          case other =>
+            throw new IllegalArgumentException(s"Type $other does not support ordered operations")
+        }
+        if (comparison != 0) {
+          return comparison
+        }
+      }
+      i += 1
+    }
+    return 0
+  }
+}
+
+object InterpretedOrdering {
+
+  /**
+   * Creates a [[InterpretedOrdering]] for the given schema, in natural ascending order.
+   */
+  def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
+    new InterpretedOrdering(dataTypes.zipWithIndex.map {
+      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+    })
+  }
+}
+
+object RowOrdering {
+
+  /**
+   * Returns true iff the data type can be ordered (i.e. can be sorted).
+   */
+  def isOrderable(dataType: DataType): Boolean = dataType match {
+    case NullType => true
+    case dt: AtomicType => true
+    case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType))
+    case _ => false
+  }
+
+  /**
+   * Returns true iff outputs from the expressions can be ordered.
+   */
+  def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 68c832d..fe7dffb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -376,7 +376,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
 
   override def symbol: String = "<"
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lt(input1,
input2)
 }
@@ -388,7 +388,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends
BinaryCo
 
   override def symbol: String = "<="
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lteq(input1,
input2)
 }
@@ -400,7 +400,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
 
   override def symbol: String = ">"
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gt(input1,
input2)
 }
@@ -412,7 +412,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends
Binar
 
   override def symbol: String = ">="
 
-  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gteq(input1,
input2)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index 0b41f92..bcf4d78 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -54,10 +54,10 @@ object TypeUtils {
   def getNumeric(t: DataType): Numeric[Any] =
     t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
 
-  def getOrdering(t: DataType): Ordering[Any] = {
+  def getInterpretedOrdering(t: DataType): Ordering[Any] = {
     t match {
       case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-      case s: StructType => s.ordering.asInstanceOf[Ordering[Any]]
+      case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 6928707..9cbc207 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -24,7 +24,7 @@ import org.json4s.JsonDSL._
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference,
Attribute, InterpretedOrdering$}
 
 
 /**
@@ -301,7 +301,7 @@ case class StructType(fields: Array[StructField]) extends DataType with
Seq[Stru
     StructType(newFields)
   }
 
-  private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType))
+  private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType))
 }
 
 object StructType extends AbstractDataType {

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index cc82f7c..e310aee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -54,7 +54,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper
{
   // GenerateOrdering agrees with RowOrdering.
   (DataTypeTestUtils.atomicTypes ++ Set(NullType)).foreach { dataType =>
     test(s"GenerateOrdering with $dataType") {
-      val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
+      val rowOrdering = InterpretedOrdering.forSchema(Seq(dataType, dataType))
       val genOrdering = GenerateOrdering.generate(
         BoundReference(0, dataType, nullable = true).asc ::
           BoundReference(1, dataType, nullable = true).asc :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 05b009d..6ea5eee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -156,7 +156,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan)
extends Una
           val mutablePair = new MutablePair[InternalRow, Null]()
           iter.map(row => mutablePair.update(row.copy(), null))
         }
-        implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+        // We need to use an interpreted ordering here because generated orderings cannot
be
+        // serialized and this ordering needs to be created on the driver in order to be
passed into
+        // Spark core code.
+        implicit val ordering = new InterpretedOrdering(sortingExpressions, child.output)
         new RangePartitioner(numPartitions, rddForSampling, ascending = true)
       case SinglePartition =>
         new Partitioner {

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index dbc0cef..2f29067 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.types.DataType
 
 object SparkPlan {
   protected[sql] val currentContext = new ThreadLocal[SQLContext]()
@@ -309,13 +310,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with
Serializ
             throw e
           } else {
             log.error("Failed to generate ordering, fallback to interpreted", e)
-            new RowOrdering(order, inputSchema)
+            new InterpretedOrdering(order, inputSchema)
           }
       }
     } else {
-      new RowOrdering(order, inputSchema)
+      new InterpretedOrdering(order, inputSchema)
     }
   }
+  /**
+   * Creates a row ordering for the given schema, in natural ascending order.
+   */
+  protected def newNaturalAscendingOrdering(dataTypes: Seq[DataType]): Ordering[InternalRow]
= {
+    val order: Seq[SortOrder] = dataTypes.zipWithIndex.map {
+      case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+    }
+    newOrdering(order, Seq.empty)
+  }
 }
 
 private[sql] trait LeafNode extends SparkPlan {

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4771702..f4677b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -212,7 +212,9 @@ case class TakeOrderedAndProject(
 
   override def outputPartitioning: Partitioning = SinglePartition
 
-  private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
+  // We need to use an interpreted ordering here because generated orderings cannot be serialized
+  // and this ordering needs to be created on the driver in order to be passed into Spark
core code.
+  private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
 
   // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
   @transient private val projection = projectList.map(new InterpretedProjection(_, child.output))

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index eb59549..4ae23c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -48,9 +48,6 @@ case class SortMergeJoin(
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  // this is to manually construct an ordering that can be used to compare keys from both
sides
-  private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType))
-
   override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
@@ -59,8 +56,10 @@ case class SortMergeJoin(
   @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output)
   @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output)
 
-  private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
+  private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
+    // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`.
     keys.map(SortOrder(_, Ascending))
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val leftResults = left.execute().map(_.copy())
@@ -68,6 +67,8 @@ case class SortMergeJoin(
 
     leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
       new Iterator[InternalRow] {
+        // An ordering that can be used to compare keys from both sides.
+        private[this] val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
         // Mutable per row objects.
         private[this] val joinRow = new JoinedRow
         private[this] var leftElement: InternalRow = _

http://git-wip-us.apache.org/repos/asf/spark/blob/618dc63e/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 08156f0..a9515a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -22,7 +22,7 @@ import scala.util.Random
 import org.apache.spark._
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, RowOrdering, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
@@ -144,8 +144,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite {
     }
     sorter.cleanupResources()
 
-    val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType))
-    val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType))
+    val keyOrdering = InterpretedOrdering.forSchema(keySchema.map(_.dataType))
+    val valueOrdering = InterpretedOrdering.forSchema(valueSchema.map(_.dataType))
     val kvOrdering = new Ordering[(InternalRow, InternalRow)] {
       override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)):
Int = {
         keyOrdering.compare(x._1, y._1) match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message