spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/2] spark git commit: [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.
Date Sun, 17 May 2015 05:01:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master 3b6ef2c53 -> 517eb37a8


http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 8ad3627..3dfa6e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.test.TestHive.{sparkContext, jsonRDD, sql}
+import org.apache.spark.sql.hive.test.TestHive.{read, sparkContext, jsonRDD, sql}
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 
 case class Nested(a: Int, B: Int)
@@ -31,14 +31,14 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
 class HiveResolutionSuite extends HiveComparisonTest {
 
   test("SPARK-3698: case insensitive test for nested data") {
-    jsonRDD(sparkContext.makeRDD(
+    read.json(sparkContext.makeRDD(
       """{"a": [{"a": {"a": 1}}]}""" :: Nil)).registerTempTable("nested")
     // This should be successfully analyzed
     sql("SELECT a[0].A.A from nested").queryExecution.analyzed
   }
 
   test("SPARK-5278: check ambiguous reference to fields") {
-    jsonRDD(sparkContext.makeRDD(
+    read.json(sparkContext.makeRDD(
       """{"a": [{"b": 1, "B": 2}]}""" :: Nil)).registerTempTable("nested")
 
     // there are 2 filed matching field name "b", we should report Ambiguous reference error

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index dfe73c6..ca2c4b4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -535,14 +535,14 @@ class SQLQuerySuite extends QueryTest {
 
   test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
     val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}"""
:: Nil)
-    jsonRDD(rdd).registerTempTable("data")
+    read.json(rdd).registerTempTable("data")
     checkAnswer(
       sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
       Row("str-1", 1970))
 
     dropTempTable("data")
 
-    jsonRDD(rdd).registerTempTable("data")
+    read.json(rdd).registerTempTable("data")
     checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
 
     dropTempTable("data")
@@ -550,7 +550,7 @@ class SQLQuerySuite extends QueryTest {
 
   test("resolve udtf with single alias") {
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
-    jsonRDD(rdd).registerTempTable("data")
+    read.json(rdd).registerTempTable("data")
     val df = sql("SELECT explode(a) AS val FROM data")
     val col = df("val")
   }
@@ -563,7 +563,7 @@ class SQLQuerySuite extends QueryTest {
     // PreInsertionCasts will actually start to work before ImplicitGenerate and then
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
-    jsonRDD(rdd).registerTempTable("data")
+    read.json(rdd).registerTempTable("data")
     val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
     setConf("spark.sql.hive.convertCTAS", "false")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a0075f1..05d9998 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -150,9 +150,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
     }
 
     val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
-    jsonRDD(rdd1).registerTempTable("jt")
+    read.json(rdd1).registerTempTable("jt")
     val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
-    jsonRDD(rdd2).registerTempTable("jt_array")
+    read.json(rdd2).registerTempTable("jt_array")
 
     setConf("spark.sql.hive.convertMetastoreParquet", "true")
   }
@@ -617,16 +617,16 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
     sql("drop table if exists spark_6016_fix")
 
     // Create a DataFrame with two partitions. So, the created table will have two parquet
files.
-    val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
-    df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    val df1 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""),
2))
+    df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
     checkAnswer(
       sql("select * from spark_6016_fix"),
       (1 to 10).map(i => Row(i))
     )
 
     // Create a DataFrame with four partitions. So, the created table will have four parquet
files.
-    val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
-    df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    val df2 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""),
4))
+    df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
     // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
     // since the new table has four parquet files, we are trying to read new footers from
two files
     // and then merge metadata in footers of these four (two outdated ones and two latest
one),
@@ -663,7 +663,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
         StructField("a", arrayType1, nullable = true) :: Nil)
     assert(df.schema === expectedSchema1)
 
-    df.saveAsTable("alwaysNullable", "parquet")
+    df.write.format("parquet").saveAsTable("alwaysNullable")
 
     val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
     val arrayType2 = ArrayType(IntegerType, containsNull = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index f44b3c5..9d9b436 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -120,10 +120,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
   test("save()/load() - non-partitioned table - ErrorIfExists") {
     withTempDir { file =>
       intercept[RuntimeException] {
-        testDF.save(
-          path = file.getCanonicalPath,
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists)
+        testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath)
       }
     }
   }
@@ -233,10 +230,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
 
   test("save()/load() - partitioned table - Ignore") {
     withTempDir { file =>
-      partitionedTestDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
+      partitionedTestDF.write
+        .format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath)
 
       val path = new Path(file.getCanonicalPath)
       val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
@@ -249,11 +244,9 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
   }
 
   test("saveAsTable()/load() - non-partitioned table - Overwrite") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
+    testDF.write.format(dataSourceName).mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .saveAsTable("t")
 
     withTable("t") {
       checkAnswer(table("t"), testDF.collect())
@@ -261,15 +254,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
   }
 
   test("saveAsTable()/load() - non-partitioned table - Append") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite)
-
-    testDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append)
+    testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t")
+    testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t")
 
     withTable("t") {
       checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
@@ -281,10 +267,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
 
     withTempTable("t") {
       intercept[AnalysisException] {
-        testDF.saveAsTable(
-          tableName = "t",
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists)
+        testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
       }
     }
   }
@@ -293,21 +276,16 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
     Seq.empty[(Int, String)].toDF().registerTempTable("t")
 
     withTempTable("t") {
-      testDF.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
-
+      testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t")
       assert(table("t").collect().isEmpty)
     }
   }
 
   test("saveAsTable()/load() - partitioned table - simple queries") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
+    partitionedTestDF.write.format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .saveAsTable("t")
 
     withTable("t") {
       checkQueries(table("t"))
@@ -492,11 +470,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
         StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
 
       checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchemaWithPartition.json)
+          .load(file.getCanonicalPath))
     }
   }
 }
@@ -518,18 +494,16 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
         sparkContext
           .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
           .toDF("a", "b", "p1")
-          .saveAsParquetFile(partitionDir.toString)
+          .write.parquet(partitionDir.toString)
       }
 
       val dataSchemaWithPartition =
         StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
 
       checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchemaWithPartition.json)
+          .load(file.getCanonicalPath))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message