spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-9420][SQL] Move expressions in sql/core package to catalyst.
Date Wed, 29 Jul 2015 00:04:04 GMT
Repository: spark
Updated Branches:
  refs/heads/master c5ed36953 -> b7f54119f


[SPARK-9420][SQL] Move expressions in sql/core package to catalyst.

Since catalyst package already depends on Spark core, we can move those expressions
into catalyst, and simplify function registry.

This is a followup of #7478.

Author: Reynold Xin <rxin@databricks.com>

Closes #7735 from rxin/SPARK-8003 and squashes the following commits:

2ffbdc3 [Reynold Xin] [SPARK-8003][SQL] Move expressions in sql/core package to catalyst.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7f54119
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7f54119
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7f54119

Branch: refs/heads/master
Commit: b7f54119f86f916481aeccc67f07e77dc2a924c7
Parents: c5ed369
Author: Reynold Xin <rxin@databricks.com>
Authored: Tue Jul 28 17:03:59 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Tue Jul 28 17:03:59 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  3 +-
 .../catalyst/analysis/FunctionRegistry.scala    | 17 ++---
 .../expressions/MonotonicallyIncreasingID.scala | 73 +++++++++++++++++++
 .../catalyst/expressions/SparkPartitionID.scala | 50 +++++++++++++
 .../expressions/NondeterministicSuite.scala     | 30 ++++++++
 .../scala/org/apache/spark/sql/SQLContext.scala | 11 +--
 .../expressions/MonotonicallyIncreasingID.scala | 74 --------------------
 .../expressions/SparkPartitionID.scala          | 51 --------------
 .../sql/execution/expressions/package.scala     | 23 ------
 .../scala/org/apache/spark/sql/functions.scala  |  4 +-
 .../scala/org/apache/spark/sql/UDFSuite.scala   |  4 +-
 .../expression/NondeterministicSuite.scala      | 32 ---------
 .../org/apache/spark/sql/hive/HiveContext.scala | 13 +---
 .../org/apache/spark/sql/hive/UDFSuite.scala    |  4 +-
 14 files changed, 173 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a723e92..a309ee3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2,
AggregateFunction2}
 import org.apache.spark.sql.catalyst.expressions._
@@ -25,7 +27,6 @@ import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}
 import org.apache.spark.sql.types._
-import scala.collection.mutable.ArrayBuffer
 
 /**
  * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for
testing

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 9b60943..372f80d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -161,13 +161,6 @@ object FunctionRegistry {
     expression[ToDegrees]("degrees"),
     expression[ToRadians]("radians"),
 
-    // misc functions
-    expression[Md5]("md5"),
-    expression[Sha2]("sha2"),
-    expression[Sha1]("sha1"),
-    expression[Sha1]("sha"),
-    expression[Crc32]("crc32"),
-
     // aggregate functions
     expression[Average]("avg"),
     expression[Count]("count"),
@@ -229,7 +222,15 @@ object FunctionRegistry {
     expression[Year]("year"),
 
     // collection functions
-    expression[Size]("size")
+    expression[Size]("size"),
+
+    // misc functions
+    expression[Crc32]("crc32"),
+    expression[Md5]("md5"),
+    expression[Sha1]("sha"),
+    expression[Sha1]("sha1"),
+    expression[Sha2]("sha2"),
+    expression[SparkPartitionID]("spark_partition_id")
   )
 
   val builtin: FunctionRegistry = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
new file mode 100644
index 0000000..291b7a5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.types.{LongType, DataType}
+
+/**
+ * Returns monotonically increasing 64-bit integers.
+ *
+ * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
+ * The current implementation puts the partition ID in the upper 31 bits, and the lower 33
bits
+ * represent the record number within each partition. The assumption is that the data frame
has
+ * less than 1 billion partitions, and each partition has less than 8 billion records.
+ *
+ * Since this expression is stateful, it cannot be a case object.
+ */
+private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic
{
+
+  /**
+   * Record ID within each partition. By being transient, count's value is reset to 0 every
time
+   * we serialize and deserialize and initialize it.
+   */
+  @transient private[this] var count: Long = _
+
+  @transient private[this] var partitionMask: Long = _
+
+  override protected def initInternal(): Unit = {
+    count = 0L
+    partitionMask = TaskContext.getPartitionId().toLong << 33
+  }
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = LongType
+
+  override protected def evalInternal(input: InternalRow): Long = {
+    val currentCount = count
+    count += 1
+    partitionMask + currentCount
+  }
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val countTerm = ctx.freshName("count")
+    val partitionMaskTerm = ctx.freshName("partitionMask")
+    ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 0L;")
+    ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm,
+      s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) <<
33;")
+
+    ev.isNull = "false"
+    s"""
+      final ${ctx.javaType(dataType)} ${ev.primitive} = $partitionMaskTerm + $countTerm;
+      $countTerm++;
+    """
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
new file mode 100644
index 0000000..3f6480b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.types.{IntegerType, DataType}
+
+
+/**
+ * Expression that returns the current partition id of the Spark task.
+ */
+private[sql] case class SparkPartitionID() extends LeafExpression with Nondeterministic {
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = IntegerType
+
+  @transient private[this] var partitionId: Int = _
+
+  override protected def initInternal(): Unit = {
+    partitionId = TaskContext.getPartitionId()
+  }
+
+  override protected def evalInternal(input: InternalRow): Int = partitionId
+
+  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+    val idTerm = ctx.freshName("partitionId")
+    ctx.addMutableState(ctx.JAVA_INT, idTerm,
+      s"$idTerm = org.apache.spark.TaskContext.getPartitionId();")
+    ev.isNull = "false"
+    s"final ${ctx.javaType(dataType)} ${ev.primitive} = $idTerm;"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
new file mode 100644
index 0000000..8289482
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+
+class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
+  test("MonotonicallyIncreasingID") {
+    checkEvaluation(MonotonicallyIncreasingID(), 0L)
+  }
+
+  test("SparkPartitionID") {
+    checkEvaluation(SparkPartitionID(), 0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 56cd8f2..dbb2a09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,8 +31,6 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression,
FunctionBuilder}
-import org.apache.spark.sql.execution.expressions.SparkPartitionID
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.errors.DialectException
@@ -142,14 +140,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   // TODO how to handle the temp function per user session?
   @transient
-  protected[sql] lazy val functionRegistry: FunctionRegistry = {
-    val reg = FunctionRegistry.builtin
-    val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))](
-      FunctionExpression[SparkPartitionID]("spark__partition__id")
-    )
-    extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name,
info, fun) }
-    reg
-  }
+  protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin
 
   @transient
   protected[sql] lazy val analyzer: Analyzer =

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
deleted file mode 100644
index eca36b3..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.expressions
-
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
-import org.apache.spark.sql.types.{LongType, DataType}
-
-/**
- * Returns monotonically increasing 64-bit integers.
- *
- * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
- * The current implementation puts the partition ID in the upper 31 bits, and the lower 33
bits
- * represent the record number within each partition. The assumption is that the data frame
has
- * less than 1 billion partitions, and each partition has less than 8 billion records.
- *
- * Since this expression is stateful, it cannot be a case object.
- */
-private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic
{
-
-  /**
-   * Record ID within each partition. By being transient, count's value is reset to 0 every
time
-   * we serialize and deserialize and initialize it.
-   */
-  @transient private[this] var count: Long = _
-
-  @transient private[this] var partitionMask: Long = _
-
-  override protected def initInternal(): Unit = {
-    count = 0L
-    partitionMask = TaskContext.getPartitionId().toLong << 33
-  }
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = LongType
-
-  override protected def evalInternal(input: InternalRow): Long = {
-    val currentCount = count
-    count += 1
-    partitionMask + currentCount
-  }
-
-  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
-    val countTerm = ctx.freshName("count")
-    val partitionMaskTerm = ctx.freshName("partitionMask")
-    ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 0L;")
-    ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm,
-      s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) <<
33;")
-
-    ev.isNull = "false"
-    s"""
-      final ${ctx.javaType(dataType)} ${ev.primitive} = $partitionMaskTerm + $countTerm;
-      $countTerm++;
-    """
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
deleted file mode 100644
index 98c8eab..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.expressions
-
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
-import org.apache.spark.sql.types.{IntegerType, DataType}
-
-
-/**
- * Expression that returns the current partition id of the Spark task.
- */
-private[sql] case class SparkPartitionID() extends LeafExpression with Nondeterministic {
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = IntegerType
-
-  @transient private[this] var partitionId: Int = _
-
-  override protected def initInternal(): Unit = {
-    partitionId = TaskContext.getPartitionId()
-  }
-
-  override protected def evalInternal(input: InternalRow): Int = partitionId
-
-  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
-    val idTerm = ctx.freshName("partitionId")
-    ctx.addMutableState(ctx.JAVA_INT, idTerm,
-      s"$idTerm = org.apache.spark.TaskContext.getPartitionId();")
-    ev.isNull = "false"
-    s"final ${ctx.javaType(dataType)} ${ev.primitive} = $idTerm;"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
deleted file mode 100644
index 568b7ac..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-/**
- * Package containing expressions that are specific to Spark runtime.
- */
-package object expressions

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 0148991..4261a5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -634,7 +634,7 @@ object functions {
    * @group normal_funcs
    * @since 1.4.0
    */
-  def monotonicallyIncreasingId(): Column = execution.expressions.MonotonicallyIncreasingID()
+  def monotonicallyIncreasingId(): Column = MonotonicallyIncreasingID()
 
   /**
    * Return an alternative value `r` if `l` is NaN.
@@ -741,7 +741,7 @@ object functions {
    * @group normal_funcs
    * @since 1.4.0
    */
-  def sparkPartitionId(): Column = execution.expressions.SparkPartitionID()
+  def sparkPartitionId(): Column = SparkPartitionID()
 
   /**
    * Computes the square root of the specified float value.

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 9b326c1..d9c8b38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -51,10 +51,10 @@ class UDFSuite extends QueryTest {
     df.selectExpr("count(distinct a)")
   }
 
-  test("SPARK-8003 spark__partition__id") {
+  test("SPARK-8003 spark_partition_id") {
     val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying")
     df.registerTempTable("tmp_table")
-    checkAnswer(ctx.sql("select spark__partition__id() from tmp_table").toDF(), Row(0))
+    checkAnswer(ctx.sql("select spark_partition_id() from tmp_table").toDF(), Row(0))
     ctx.dropTempTable("tmp_table")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
deleted file mode 100644
index b6e79ff..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.expression
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper
-import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID}
-
-class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
-  test("MonotonicallyIncreasingID") {
-    checkEvaluation(MonotonicallyIncreasingID(), 0L)
-  }
-
-  test("SparkPartitionID") {
-    checkEvaluation(SparkPartitionID(), 0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 8b35c12..110f51a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -38,9 +38,6 @@ import org.apache.spark.Logging
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression,
FunctionBuilder}
-import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
-import org.apache.spark.sql.execution.expressions.SparkPartitionID
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.SQLConf.SQLConfEntry._
 import org.apache.spark.sql.catalyst.{TableIdentifier, ParserDialect}
@@ -375,14 +372,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging
{
 
   // Note that HiveUDFs will be overridden by functions registered in this context.
   @transient
-  override protected[sql] lazy val functionRegistry: FunctionRegistry = {
-    val reg = new HiveFunctionRegistry(FunctionRegistry.builtin)
-    val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))](
-      FunctionExpression[SparkPartitionID]("spark__partition__id")
-    )
-    extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name,
info, fun) }
-    reg
-  }
+  override protected[sql] lazy val functionRegistry: FunctionRegistry =
+    new HiveFunctionRegistry(FunctionRegistry.builtin)
 
   /* An analyzer that uses the Hive metastore. */
   @transient

http://git-wip-us.apache.org/repos/asf/spark/blob/b7f54119/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index 9cea5d4..37afc21 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -35,9 +35,9 @@ class UDFSuite extends QueryTest {
     assert(ctx.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) ===
5)
   }
 
-  test("SPARK-8003 spark__partition__id") {
+  test("SPARK-8003 spark_partition_id") {
     val df = Seq((1, "Two Fiiiiive")).toDF("id", "saying")
     ctx.registerDataFrameAsTable(df, "test_table")
-    checkAnswer(ctx.sql("select spark__partition__id() from test_table LIMIT 1").toDF(),
Row(0))
+    checkAnswer(ctx.sql("select spark_partition_id() from test_table LIMIT 1").toDF(), Row(0))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message