spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-5193][SQL] Tighten up SQLContext API
Date Thu, 15 Jan 2015 02:45:00 GMT
Repository: spark
Updated Branches:
  refs/heads/master 13d240678 -> cfa397c12


[SPARK-5193][SQL] Tighten up SQLContext API

1. Removed 2 implicits (logicalPlanToSparkQuery and baseRelationToSchemaRDD)
2. Moved extraStrategies into ExperimentalMethods.
3. Made private methods protected[sql] so they don't show up in javadocs.
4. Removed createParquetFile.
5. Added Java version of applySchema to SQLContext.

Author: Reynold Xin <rxin@databricks.com>

Closes #4049 from rxin/sqlContext-refactor and squashes the following commits:

a326a1a [Reynold Xin] Remove createParquetFile and add applySchema for Java to SQLContext.
ecd6685 [Reynold Xin] Added baseRelationToSchemaRDD back.
4a38c9b [Reynold Xin] [SPARK-5193][SQL] Tighten up SQLContext API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfa397c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfa397c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfa397c1

Branch: refs/heads/master
Commit: cfa397c126c857bfc9843d9e598a14b7c1e0457f
Parents: 13d2406
Author: Reynold Xin <rxin@databricks.com>
Authored: Wed Jan 14 18:36:15 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed Jan 14 18:36:15 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/ExperimentalMethods.scala  |  36 +++++
 .../scala/org/apache/spark/sql/SQLContext.scala | 152 +++++++++++-------
 .../apache/spark/sql/execution/commands.scala   |  10 +-
 .../org/apache/spark/sql/sources/ddl.scala      |   5 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |  16 +-
 .../org/apache/spark/sql/InsertIntoSuite.scala  | 160 -------------------
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  26 ---
 .../spark/sql/parquet/ParquetQuerySuite2.scala  |  22 ---
 .../org/apache/spark/sql/hive/HiveContext.scala |   4 +-
 .../org/apache/spark/sql/hive/TestHive.scala    |   2 +-
 10 files changed, 152 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
new file mode 100644
index 0000000..f0e6a8f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Holder for experimental methods for the bravest. We make NO guarantee about the stability
+ * regarding binary compatibility and source compatibility of methods here.
+ */
+@Experimental
+class ExperimentalMethods protected[sql](sqlContext: SQLContext) {
+
+  /**
+   * Allows extra strategies to be injected into the query planner at runtime.  Note this
API
+   * should be consider experimental and is not intended to be stable across releases.
+   */
+  @Experimental
+  var extraStrategies: Seq[Strategy] = Nil
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d9f3b3a..279671c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.sql
 
+import java.beans.Introspector
 import java.util.Properties
 
 import scala.collection.immutable
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
+import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis._
@@ -36,9 +37,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation}
+import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * :: AlphaComponent ::
@@ -59,7 +60,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   self =>
 
   // Note that this is a lazy val so we can override the default value in subclasses.
-  private[sql] lazy val conf: SQLConf = new SQLConf
+  protected[sql] lazy val conf: SQLConf = new SQLConf
 
   /** Set Spark SQL configuration properties. */
   def setConf(props: Properties): Unit = conf.setConf(props)
@@ -118,15 +119,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   /**
-   * :: DeveloperApi ::
-   * Allows catalyst LogicalPlans to be executed as a SchemaRDD.  Note that the LogicalPlan
-   * interface is considered internal, and thus not guaranteed to be stable.  As a result,
using
-   * them directly is not recommended.
-   */
-  @DeveloperApi
-  implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this,
plan)
-
-  /**
    * Creates a SchemaRDD from an RDD of case classes.
    *
    * @group userf
@@ -139,8 +131,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self))
   }
 
-  implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
-    logicalPlanToSparkQuery(LogicalRelation(baseRelation))
+  /**
+   * Convert a [[BaseRelation]] created for external data sources into a [[SchemaRDD]].
+   */
+  def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
+    new SchemaRDD(this, LogicalRelation(baseRelation))
   }
 
   /**
@@ -182,6 +177,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   /**
+   * Applies a schema to an RDD of Java Beans.
+   *
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
+   *          SELECT * queries will return the columns in an undefined order.
+   */
+  def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = {
+    val attributeSeq = getSchema(beanClass)
+    val className = beanClass.getName
+    val rowRdd = rdd.mapPartitions { iter =>
+      // BeanInfo is not serializable so we must rediscover it remotely for each partition.
+      val localBeanInfo = Introspector.getBeanInfo(
+        Class.forName(className, true, Utils.getContextOrSparkClassLoader))
+      val extractors =
+        localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
+
+      iter.map { row =>
+        new GenericRow(
+          extractors.zip(attributeSeq).map { case (e, attr) =>
+            DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
+          }.toArray[Any]
+        ) : Row
+      }
+    }
+    new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this))
+  }
+
+  /**
+   * Applies a schema to an RDD of Java Beans.
+   *
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
+   *          SELECT * queries will return the columns in an undefined order.
+   */
+  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = {
+    applySchema(rdd.rdd, beanClass)
+  }
+
+  /**
    * Loads a Parquet file, returning the result as a [[SchemaRDD]].
    *
    * @group userf
@@ -260,41 +292,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   /**
-   * :: Experimental ::
-   * Creates an empty parquet file with the schema of class `A`, which can be registered
as a table.
-   * This registered table can be used as the target of future `insertInto` operations.
-   *
-   * {{{
-   *   val sqlContext = new SQLContext(...)
-   *   import sqlContext._
-   *
-   *   case class Person(name: String, age: Int)
-   *   createParquetFile[Person]("path/to/file.parquet").registerTempTable("people")
-   *   sql("INSERT INTO people SELECT 'michael', 29")
-   * }}}
-   *
-   * @tparam A A case class type that describes the desired schema of the parquet file to
be
-   *           created.
-   * @param path The path where the directory containing parquet metadata should be created.
-   *             Data inserted into this table will also be stored at this location.
-   * @param allowExisting When false, an exception will be thrown if this directory already
exists.
-   * @param conf A Hadoop configuration object that can be used to specify options to the
parquet
-   *             output format.
-   *
-   * @group userf
-   */
-  @Experimental
-  def createParquetFile[A <: Product : TypeTag](
-      path: String,
-      allowExisting: Boolean = true,
-      conf: Configuration = new Configuration()): SchemaRDD = {
-    new SchemaRDD(
-      this,
-      ParquetRelation.createEmpty(
-        path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
-  }
-
-  /**
    * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist
only
    * during the lifetime of this instance of SQLContext.
    *
@@ -336,12 +333,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
 
   /**
-   * :: DeveloperApi ::
-   * Allows extra strategies to be injected into the query planner at runtime.  Note this
API
-   * should be consider experimental and is not intended to be stable across releases.
+   * A collection of methods that are considered experimental, but can be used to hook into
+   * the query planner for advanced functionalities.
    */
-  @DeveloperApi
-  var extraStrategies: Seq[Strategy] = Nil
+  val experimental: ExperimentalMethods = new ExperimentalMethods(this)
 
   protected[sql] class SparkPlanner extends SparkStrategies {
     val sparkContext: SparkContext = self.sparkContext
@@ -353,7 +348,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     def numPartitions = self.conf.numShufflePartitions
 
     def strategies: Seq[Strategy] =
-      extraStrategies ++ (
+      experimental.extraStrategies ++ (
       DataSourceStrategy ::
       DDLStrategy ::
       TakeOrdered ::
@@ -479,14 +474,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * have the same format as the one generated by `toString` in scala.
    * It is only used by PySpark.
    */
-  private[sql] def parseDataType(dataTypeString: String): DataType = {
+  protected[sql] def parseDataType(dataTypeString: String): DataType = {
     DataType.fromJson(dataTypeString)
   }
 
   /**
    * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
    */
-  private[sql] def applySchemaToPythonRDD(
+  protected[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schemaString: String): SchemaRDD = {
     val schema = parseDataType(schemaString).asInstanceOf[StructType]
@@ -496,7 +491,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /**
    * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
    */
-  private[sql] def applySchemaToPythonRDD(
+  protected[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schema: StructType): SchemaRDD = {
 
@@ -527,4 +522,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
   }
+
+  /**
+   * Returns a Catalyst Schema for the given java bean class.
+   */
+  protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
+    // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
+    val beanInfo = Introspector.getBeanInfo(beanClass)
+
+    // Note: The ordering of elements may differ from when the schema is inferred in Scala.
+    //       This is because beanInfo.getPropertyDescriptors gives no guarantees about
+    //       element ordering.
+    val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+    fields.map { property =>
+      val (dataType, nullable) = property.getPropertyType match {
+        case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
+          (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
+        case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
+        case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
+        case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
+        case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
+        case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
+        case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
+        case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
+        case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
+
+        case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
+        case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
+        case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
+        case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
+        case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
+        case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
+        case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
+        case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
+        case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
+        case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
+      }
+      AttributeReference(property.getName, dataType, nullable)()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index af6b07b..52a31f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.execution
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.{SQLConf, SQLContext}
 
 /**
  * A logical command that is executed for its side-effects.  `RunnableCommand`s are
@@ -137,14 +137,12 @@ case class CacheTableCommand(
     isLazy: Boolean) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext) = {
-    import sqlContext._
-
-    plan.foreach(_.registerTempTable(tableName))
-    cacheTable(tableName)
+    plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName))
+    sqlContext.cacheTable(tableName)
 
     if (!isLazy) {
       // Performs eager caching
-      table(tableName).count()
+      sqlContext.table(tableName).count()
     }
 
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 4cc9641..381298c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
 import scala.util.parsing.combinator.PackratParsers
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{SchemaRDD, SQLContext}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.SqlLexical
 import org.apache.spark.sql.execution.RunnableCommand
@@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing(
 
   def run(sqlContext: SQLContext) = {
     val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
-
-    sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
+    new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName)
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 8c80be1..f9c0822 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.test
 
+import scala.language.implicitConversions
+
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SQLConf, SQLContext}
+import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 /** A SQLContext that can be used for local testing. */
 object TestSQLContext
@@ -29,7 +32,16 @@ object TestSQLContext
       new SparkConf().set("spark.sql.testkey", "true"))) {
 
   /** Fewer partitions to speed up testing. */
-  private[sql] override lazy val conf: SQLConf = new SQLConf {
+  protected[sql] override lazy val conf: SQLConf = new SQLConf {
     override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
   }
+
+  /**
+   * Turn a logical plan into a SchemaRDD. This should be removed once we have an easier
way to
+   * construct SchemaRDD directly out of local data without relying on implicits.
+   */
+  protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = {
+    new SchemaRDD(this, plan)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
deleted file mode 100644
index c87d762..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import _root_.java.io.File
-
-/* Implicits */
-import org.apache.spark.sql.test.TestSQLContext._
-
-class InsertIntoSuite extends QueryTest {
-  TestData // Initialize TestData
-  import TestData._
-
-  test("insertInto() created parquet file") {
-    val testFilePath = File.createTempFile("sparkSql", "pqt")
-    testFilePath.delete()
-    testFilePath.deleteOnExit()
-    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
-    testFile.registerTempTable("createAndInsertTest")
-
-    // Add some data.
-    testData.insertInto("createAndInsertTest")
-
-    // Make sure its there for a new instance of parquet file.
-    checkAnswer(
-      parquetFile(testFilePath.getCanonicalPath),
-      testData.collect().toSeq
-    )
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq
-    )
-
-    // Add more data.
-    testData.insertInto("createAndInsertTest")
-
-    // Make sure all data is there for a new instance of parquet file.
-    checkAnswer(
-      parquetFile(testFilePath.getCanonicalPath),
-      testData.collect().toSeq ++ testData.collect().toSeq
-    )
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq ++ testData.collect().toSeq
-    )
-
-    // Now overwrite.
-    testData.insertInto("createAndInsertTest", overwrite = true)
-
-    // Make sure its there for a new instance of parquet file.
-    checkAnswer(
-      parquetFile(testFilePath.getCanonicalPath),
-      testData.collect().toSeq
-    )
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq
-    )
-
-    testFilePath.delete()
-  }
-
-  test("INSERT INTO parquet table") {
-    val testFilePath = File.createTempFile("sparkSql", "pqt")
-    testFilePath.delete()
-    testFilePath.deleteOnExit()
-    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
-    testFile.registerTempTable("createAndInsertSQLTest")
-
-    sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
-
-    // Make sure its there for a new instance of parquet file.
-    checkAnswer(
-      parquetFile(testFilePath.getCanonicalPath),
-      testData.collect().toSeq
-    )
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertSQLTest"),
-      testData.collect().toSeq
-    )
-
-    // Append more data.
-    sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
-
-    // Make sure all data is there for a new instance of parquet file.
-    checkAnswer(
-      parquetFile(testFilePath.getCanonicalPath),
-      testData.collect().toSeq ++ testData.collect().toSeq
-    )
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertSQLTest"),
-      testData.collect().toSeq ++ testData.collect().toSeq
-    )
-
-    sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData")
-
-    // Make sure its there for a new instance of parquet file.
-    checkAnswer(
-      parquetFile(testFilePath.getCanonicalPath),
-      testData.collect().toSeq
-    )
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertSQLTest"),
-      testData.collect().toSeq
-    )
-
-    testFilePath.delete()
-  }
-
-  test("Double create fails when allowExisting = false") {
-    val testFilePath = File.createTempFile("sparkSql", "pqt")
-    testFilePath.delete()
-    testFilePath.deleteOnExit()
-    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
-
-    intercept[RuntimeException] {
-      createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false)
-    }
-
-    testFilePath.delete()
-  }
-
-  test("Double create does not fail when allowExisting = true") {
-    val testFilePath = File.createTempFile("sparkSql", "pqt")
-    testFilePath.delete()
-    testFilePath.deleteOnExit()
-    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
-
-    createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true)
-
-    testFilePath.delete()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index fe781ec..3a073a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -402,23 +402,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     Utils.deleteRecursively(file)
   }
 
-  test("Insert (overwrite) via Scala API") {
-    val dirname = Utils.createTempDir()
-    val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
-      .map(i => TestRDDEntry(i, s"val_$i"))
-    source_rdd.registerTempTable("source")
-    val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString)
-    dest_rdd.registerTempTable("dest")
-    sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
-    val rdd_copy1 = sql("SELECT * FROM dest").collect()
-    assert(rdd_copy1.size === 100)
-
-    sql("INSERT INTO dest SELECT * FROM source")
-    val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
-    assert(rdd_copy2.size === 200)
-    Utils.deleteRecursively(dirname)
-  }
-
   test("Insert (appending) to same table via Scala API") {
     sql("INSERT INTO testsource SELECT * FROM testsource")
     val double_rdd = sql("SELECT * FROM testsource").collect()
@@ -902,15 +885,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     Utils.deleteRecursively(tmpdir)
   }
 
-  test("Querying on empty parquet throws exception (SPARK-3536)") {
-    val tmpdir = Utils.createTempDir()
-    Utils.deleteRecursively(tmpdir)
-    createParquetFile[TestRDDEntry](tmpdir.toString()).registerTempTable("tmpemptytable")
-    val result1 = sql("SELECT * FROM tmpemptytable").collect()
-    assert(result1.size === 0)
-    Utils.deleteRecursively(tmpdir)
-  }
-
   test("read/write fixed-length decimals") {
     for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
       val tempDir = getTempFilePath("parquetTest").getCanonicalPath

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
index daa7ca6..4c081fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala
@@ -34,19 +34,6 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest {
     }
   }
 
-  test("insertion") {
-    withTempDir { dir =>
-      val data = (0 until 10).map(i => (i, i.toString))
-      withParquetTable(data, "t") {
-        createParquetFile[(Int, String)](dir.toString).registerTempTable("dest")
-        withTempTable("dest") {
-          sql("INSERT OVERWRITE INTO dest SELECT * FROM t")
-          checkAnswer(table("dest"), data)
-        }
-      }
-    }
-  }
-
   test("appending") {
     val data = (0 until 10).map(i => (i, i.toString))
     withParquetTable(data, "t") {
@@ -98,13 +85,4 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest {
       checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
     }
   }
-
-  test("SPARK-3536 regression: query empty Parquet file shouldn't throw") {
-    withTempDir { dir =>
-      createParquetFile[(Int, String)](dir.toString).registerTempTable("t")
-      withTempTable("t") {
-        checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index bf56e60..a9a20a5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -71,7 +71,7 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
 class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   self =>
 
-  private[sql] override lazy val conf: SQLConf = new SQLConf {
+  protected[sql] override lazy val conf: SQLConf = new SQLConf {
     override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
   }
 
@@ -348,7 +348,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   val hivePlanner = new SparkPlanner with HiveStrategies {
     val hiveContext = self
 
-    override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
+    override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
       DataSourceStrategy,
       HiveCommandStrategy(self),
       HiveDDLStrategy,

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa397c1/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 52e1f0d..47431ce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -102,7 +102,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     new this.QueryExecution { val logical = plan }
 
   /** Fewer partitions to speed up testing. */
-  private[sql] override lazy val conf: SQLConf = new SQLConf {
+  protected[sql] override lazy val conf: SQLConf = new SQLConf {
     override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
     override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message