Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 25874184CE for ; Sat, 23 May 2015 16:48:26 +0000 (UTC) Received: (qmail 74759 invoked by uid 500); 23 May 2015 16:48:26 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 74740 invoked by uid 500); 23 May 2015 16:48:26 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 74730 invoked by uid 99); 23 May 2015 16:48:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 May 2015 16:48:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2945E0542; Sat, 23 May 2015 16:48:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yhuai@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-7654] [SQL] Move insertInto into reader/writer interface. Date: Sat, 23 May 2015 16:48:25 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master a4df0f2d8 -> 2b7e63585 [SPARK-7654] [SQL] Move insertInto into reader/writer interface. This one continues the work of https://github.com/apache/spark/pull/6216. Author: Yin Huai Author: Reynold Xin Closes #6366 from yhuai/insert and squashes the following commits: 3d717fb [Yin Huai] Use insertInto to handle the casue when table exists and Append is used for saveAsTable. 56d2540 [Yin Huai] Add PreWriteCheck to HiveContext's analyzer. c636e35 [Yin Huai] Remove unnecessary empty lines. cf83837 [Yin Huai] Move insertInto to write. Also, remove the partition columns from InsertIntoHadoopFsRelation. 0841a54 [Reynold Xin] Removed experimental tag for deprecated methods. 33ed8ef [Reynold Xin] [SPARK-7654][SQL] Move insertInto into reader/writer interface. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b7e6358 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b7e6358 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b7e6358 Branch: refs/heads/master Commit: 2b7e63585d61be2dab78b70af3867cda3983d5b1 Parents: a4df0f2 Author: Yin Huai Authored: Sat May 23 09:48:20 2015 -0700 Committer: Yin Huai Committed: Sat May 23 09:48:20 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/DataFrame.scala | 52 +++++++-------- .../org/apache/spark/sql/DataFrameReader.scala | 18 +----- .../org/apache/spark/sql/DataFrameWriter.scala | 66 +++++++++++++++++--- .../spark/sql/parquet/ParquetTableSupport.scala | 2 +- .../spark/sql/sources/DataSourceStrategy.scala | 5 +- .../org/apache/spark/sql/sources/commands.scala | 2 +- .../org/apache/spark/sql/sources/ddl.scala | 1 - .../org/apache/spark/sql/sources/rules.scala | 19 +++++- .../org/apache/spark/sql/hive/HiveContext.scala | 4 ++ .../sql/hive/InsertIntoHiveTableSuite.scala | 6 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 8 +-- .../sql/hive/execution/SQLQuerySuite.scala | 8 +-- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- .../sql/sources/hadoopFsRelationSuites.scala | 10 --- 14 files changed, 116 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3ec1c4a..f968577 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1395,28 +1395,6 @@ class DataFrame private[sql]( def write: DataFrameWriter = new DataFrameWriter(this) /** - * :: Experimental :: - * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. - * @group output - * @since 1.3.0 - */ - @Experimental - def insertInto(tableName: String, overwrite: Boolean): Unit = { - sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)), - Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd - } - - /** - * :: Experimental :: - * Adds the rows from this RDD to the specified table. - * Throws an exception if the table already exists. - * @group output - * @since 1.3.0 - */ - @Experimental - def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false) - - /** * Returns the content of the [[DataFrame]] as a RDD of JSON strings. * @group rdd * @since 1.3.0 @@ -1551,13 +1529,7 @@ class DataFrame private[sql]( */ @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0") def saveAsTable(tableName: String, mode: SaveMode): Unit = { - if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) { - // If table already exists and the save mode is Append, - // we will just call insertInto to append the contents of this DataFrame. - insertInto(tableName, overwrite = false) - } else { - write.mode(mode).saveAsTable(tableName) - } + write.mode(mode).saveAsTable(tableName) } /** @@ -1713,9 +1685,29 @@ class DataFrame private[sql]( write.format(source).mode(mode).options(options).save() } + + /** + * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. + * @group output + */ + @deprecated("Use write.mode(SaveMode.Append|SaveMode.Overwrite).saveAsTable(tableName)", "1.4.0") + def insertInto(tableName: String, overwrite: Boolean): Unit = { + write.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append).insertInto(tableName) + } + + /** + * Adds the rows from this RDD to the specified table. + * Throws an exception if the table already exists. + * @group output + */ + @deprecated("Use write.mode(SaveMode.Append).saveAsTable(tableName)", "1.4.0") + def insertInto(tableName: String): Unit = { + write.mode(SaveMode.Append).insertInto(tableName) + } + //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// - // End of eeprecated methods + // End of deprecated methods //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 381c10f..b44d4c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -95,20 +95,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { } /** - * Specifies the input partitioning. If specified, the underlying data source does not need to - * discover the data partitioning scheme, and thus can speed up very large inputs. - * - * This is only applicable for Parquet at the moment. - * - * @since 1.4.0 - */ - @scala.annotation.varargs - def partitionBy(colNames: String*): DataFrameReader = { - this.partitioningColumns = Option(colNames) - this - } - - /** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * @@ -128,7 +114,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, - partitionColumns = partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), + partitionColumns = Array.empty[String], provider = source, options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation)) @@ -300,6 +286,4 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { private var extraOptions = new scala.collection.mutable.HashMap[String, String] - private var partitioningColumns: Option[Seq[String]] = None - } http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f2e721d..5548b26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql import java.util.Properties import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils} import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} @@ -149,21 +151,65 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** + * Inserts the content of the [[DataFrame]] to the specified table. It requires that + * the schema of the [[DataFrame]] is the same as the schema of the table. + * + * Because it inserts data to an existing table, format or options will be ignored. + * + * @since 1.4.0 + */ + def insertInto(tableName: String): Unit = { + val partitions = + partitioningColumns.map(_.map(col => col -> (None: Option[String])).toMap) + val overwrite = (mode == SaveMode.Overwrite) + df.sqlContext.executePlan(InsertIntoTable( + UnresolvedRelation(Seq(tableName)), + partitions.getOrElse(Map.empty[String, Option[String]]), + df.logicalPlan, + overwrite, + ifNotExists = false)).toRdd + } + + /** * Saves the content of the [[DataFrame]] as the specified table. * + * In the case the table already exists, behavior of this function depends on the + * save mode, specified by the `mode` function (default to throwing an exception). + * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be + * the same as that of the existing table. + * When `mode` is `Append`, the schema of the [[DataFrame]] need to be + * the same as that of the existing table, and format or options will be ignored. + * * @since 1.4.0 */ def saveAsTable(tableName: String): Unit = { - val cmd = - CreateTableUsingAsSelect( - tableName, - source, - temporary = false, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), - mode, - extraOptions.toMap, - df.logicalPlan) - df.sqlContext.executePlan(cmd).toRdd + if (df.sqlContext.catalog.tableExists(tableName :: Nil) && mode != SaveMode.Overwrite) { + mode match { + case SaveMode.Ignore => + // Do nothing + + case SaveMode.ErrorIfExists => + throw new AnalysisException(s"Table $tableName already exists.") + + case SaveMode.Append => + // If it is Append, we just ask insertInto to handle it. We will not use insertInto + // to handle saveAsTable with Overwrite because saveAsTable can change the schema of + // the table. But, insertInto with Overwrite requires the schema of data be the same + // the schema of the table. + insertInto(tableName) + } + } else { + val cmd = + CreateTableUsingAsSelect( + tableName, + source, + temporary = false, + partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), + mode, + extraOptions.toMap, + df.logicalPlan) + df.sqlContext.executePlan(cmd).toRdd + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index c45c431..70a220c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -129,7 +129,7 @@ private[parquet] object RowReadSupport { } /** - * A `parquet.hadoop.api.WriteSupport` for Row ojects. + * A `parquet.hadoop.api.WriteSupport` for Row objects. */ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index c03649d..dacd967 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -105,10 +105,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty => + l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand( - InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil + execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil case _ => Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 498f753..c3674a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -61,7 +61,6 @@ private[sql] case class InsertIntoDataSource( private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, - partitionColumns: Array[String], mode: SaveMode) extends RunnableCommand { @@ -100,6 +99,7 @@ private[sql] case class InsertIntoHadoopFsRelation( relation.schema, needsConversion = false) + val partitionColumns = relation.partitionColumns.fieldNames if (partitionColumns.isEmpty) { insert(new DefaultWriterContainer(relation, job), df) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 5e72312..ca30b8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -335,7 +335,6 @@ private[sql] object ResolvedDataSource { InsertIntoHadoopFsRelation( r, project, - partitionColumns.toArray, mode)).toRdd r case _ => http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index ab33125..a3fd7f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -35,9 +35,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - // We are inserting into an InsertableRelation. + // We are inserting into an InsertableRelation or HadoopFsRelation. case i @ InsertIntoTable( - l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite, ifNotExists) => { + l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => { // First, make sure the data to be inserted have the same number of fields with the // schema of the relation. if (l.output.size != child.output.size) { @@ -101,7 +101,20 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } } - case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK + case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) => + // We need to make sure the partition columns specified by users do match partition + // columns of the relation. + val existingPartitionColumns = r.partitionColumns.fieldNames.toSet + val specifiedPartitionColumns = part.keySet + if (existingPartitionColumns != specifiedPartitionColumns) { + failAnalysis(s"Specified partition columns " + + s"(${specifiedPartitionColumns.mkString(", ")}) " + + s"do not match the partition columns of the table. Please use " + + s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") + } else { + // OK + } + case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a8e8e70..0d807f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -373,6 +373,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ResolveHiveWindowFunction :: sources.PreInsertCastAndRename :: Nil + + override val extendedCheckRules = Seq( + sources.PreWriteCheck(catalog) + ) } override protected[sql] def createSession(): SQLSession = { http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index ecb990e..acf2f7d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -53,7 +53,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { sql("CREATE TABLE createAndInsertTest (key int, value string)") // Add some data. - testData.insertInto("createAndInsertTest") + testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest") // Make sure the table has also been updated. checkAnswer( @@ -62,7 +62,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { ) // Add more data. - testData.insertInto("createAndInsertTest") + testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest") // Make sure the table has been updated. checkAnswer( @@ -71,7 +71,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { ) // Now overwrite. - testData.insertInto("createAndInsertTest", overwrite = true) + testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest") // Make sure the registered table has also been updated. checkAnswer( http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c4c7b63..9623ef0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -608,7 +608,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructType( StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil) assert(df2.schema === expectedSchema2) - df2.insertInto("arrayInParquet", overwrite = false) + df2.write.mode(SaveMode.Append).insertInto("arrayInParquet") createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto. createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write @@ -642,7 +642,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructType( StructField("a", mapType2, nullable = true) :: Nil) assert(df2.schema === expectedSchema2) - df2.insertInto("mapInParquet", overwrite = false) + df2.write.mode(SaveMode.Append).insertInto("mapInParquet") createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) .saveAsTable("mapInParquet") // This one internally calls df2.insertInto. createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write @@ -768,7 +768,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), (6 to 34).map(i => Row(i, s"str$i"))) - createDF(40, 49).insertInto("insertParquet") + createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), (6 to 44).map(i => Row(i, s"str$i"))) @@ -782,7 +782,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT p.c1, c2 FROM insertParquet p"), (50 to 59).map(i => Row(i, s"str$i"))) - createDF(70, 79).insertInto("insertParquet", overwrite = true) + createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p"), (70 to 79).map(i => Row(i, s"str$i"))) http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ba53ed9..b707f5e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} +import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -425,10 +425,10 @@ class SQLQuerySuite extends QueryTest { test("SPARK-4825 save join to table") { val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() sql("CREATE TABLE test1 (key INT, value STRING)") - testData.insertInto("test1") + testData.write.mode(SaveMode.Append).insertInto("test1") sql("CREATE TABLE test2 (key INT, value STRING)") - testData.insertInto("test2") - testData.insertInto("test2") + testData.write.mode(SaveMode.Append).insertInto("test2") + testData.write.mode(SaveMode.Append).insertInto("test2") sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") checkAnswer( table("test"), http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 223ba65..7851f38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -316,7 +316,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + @@ -346,7 +346,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index c7c8bcd..3222690 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -362,16 +362,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { .partitionBy("p1") .saveAsTable("t") } - - // Using different order of partition columns - intercept[Throwable] { - partitionedTestDF2.write - .format(dataSourceName) - .mode(SaveMode.Append) - .option("dataSchema", dataSchema.json) - .partitionBy("p2", "p1") - .saveAsTable("t") - } } test("saveAsTable()/load() - partitioned table - ErrorIfExists") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org