spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/3] spark git commit: [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
Date Sat, 14 Feb 2015 07:03:34 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 db5747921 -> ba91bf5f4


http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 89b18c3..9fcb04c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -37,7 +37,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
   import org.apache.spark.sql.hive.test.TestHive.implicits._
 
   val testData = TestHive.sparkContext.parallelize(
-    (1 to 100).map(i => TestData(i, i.toString)))
+    (1 to 100).map(i => TestData(i, i.toString))).toDF
 
   before {
     // Since every we are doing tests for DDL statements,
@@ -56,7 +56,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
     // Make sure the table has also been updated.
     checkAnswer(
       sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq.map(Row.fromTuple)
+      testData.collect().toSeq
     )
 
     // Add more data.
@@ -65,7 +65,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
     // Make sure the table has been updated.
     checkAnswer(
       sql("SELECT * FROM createAndInsertTest"),
-      testData.toDataFrame.collect().toSeq ++ testData.toDataFrame.collect().toSeq
+      testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq
     )
 
     // Now overwrite.
@@ -74,7 +74,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
     // Make sure the registered table has also been updated.
     checkAnswer(
       sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq.map(Row.fromTuple)
+      testData.collect().toSeq
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 068aa03..321b784 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -29,7 +29,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfterAll {
   import org.apache.spark.sql.hive.test.TestHive.implicits._
 
   val df =
-    sparkContext.parallelize((1 to 10).map(i => (i,s"str$i"))).toDataFrame("key", "value")
+    sparkContext.parallelize((1 to 10).map(i => (i,s"str$i"))).toDF("key", "value")
 
   override def beforeAll(): Unit = {
     // The catalog in HiveContext is a case insensitive one.

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 2916724..addf887 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -28,17 +28,14 @@ import org.apache.spark.sql.catalyst.util
 import org.apache.spark.sql._
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.types._
-
-/* Implicits */
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
 
 /**
  * Tests for persisting tables created though the data sources API into the metastore.
  */
 class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
-  import org.apache.spark.sql.hive.test.TestHive.implicits._
-
   override def afterEach(): Unit = {
     reset()
     if (tempPath.exists()) Utils.deleteRecursively(tempPath)
@@ -154,7 +151,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach
{
   test("check change without refresh") {
     val tempDir = File.createTempFile("sparksql", "json")
     tempDir.delete()
-    sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+    sparkContext.parallelize(("a", "b") :: Nil).toDF
+      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
 
     sql(
       s"""
@@ -170,7 +168,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach
{
       Row("a", "b"))
 
     FileUtils.deleteDirectory(tempDir)
-    sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+    sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF
+      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
 
     // Schema is cached so the new column does not show. The updated values in existing columns
     // will show.
@@ -190,7 +189,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach
{
   test("drop, change, recreate") {
     val tempDir = File.createTempFile("sparksql", "json")
     tempDir.delete()
-    sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+    sparkContext.parallelize(("a", "b") :: Nil).toDF
+      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
 
     sql(
       s"""
@@ -206,7 +206,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach
{
       Row("a", "b"))
 
     FileUtils.deleteDirectory(tempDir)
-    sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+    sparkContext.parallelize(("a", "b", "c") :: Nil).toDF
+      .toJSON.saveAsTextFile(tempDir.getCanonicalPath)
 
     sql("DROP TABLE jsonTable")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 405b200..d01dbf8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.spark.{SparkFiles, SparkException}
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
@@ -567,7 +567,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
       TestHive.sparkContext.parallelize(
         TestData(1, "str1") ::
         TestData(2, "str2") :: Nil)
-    testData.registerTempTable("REGisteredTABle")
+    testData.toDF.registerTempTable("REGisteredTABle")
 
     assertResult(Array(Row(2, "str2"))) {
       sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " +
@@ -592,7 +592,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") {
     val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3))
       .zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)}
-    TestHive.sparkContext.parallelize(fixture).registerTempTable("having_test")
+    TestHive.sparkContext.parallelize(fixture).toDF.registerTempTable("having_test")
     val results =
       sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr >
3")
       .collect()
@@ -740,7 +740,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
       TestHive.sparkContext.parallelize(
         TestData(1, "str1") ::
         TestData(1, "str2") :: Nil)
-    testData.registerTempTable("test_describe_commands2")
+    testData.toDF.registerTempTable("test_describe_commands2")
 
     assertResult(
       Array(
@@ -900,8 +900,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   }
 
   test("SPARK-3414 regression: should store analyzed logical plan when registering a temp
table") {
-    sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
-    sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
+    sparkContext.makeRDD(Seq.empty[LogEntry]).toDF.registerTempTable("rawLogs")
+    sparkContext.makeRDD(Seq.empty[LogFile]).toDF.registerTempTable("logFiles")
 
     sql(
       """

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 029c36a..6fc4cc1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -77,7 +77,7 @@ class HiveResolutionSuite extends HiveComparisonTest {
   test("case insensitivity with scala reflection") {
     // Test resolution with Scala Reflection
     sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
-      .registerTempTable("caseSensitivityTest")
+      .toDF.registerTempTable("caseSensitivityTest")
 
     val query = sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
     assert(query.schema.fields.map(_.name) === Seq("a", "b", "A", "B", "a", "b", "A", "B"),
@@ -88,14 +88,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
   ignore("case insensitivity with scala reflection joins") {
     // Test resolution with Scala Reflection
     sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
-      .registerTempTable("caseSensitivityTest")
+      .toDF.registerTempTable("caseSensitivityTest")
 
     sql("SELECT * FROM casesensitivitytest a JOIN casesensitivitytest b ON a.a = b.a").collect()
   }
 
   test("nested repeated resolution") {
     sparkContext.parallelize(Data(1, 2, Nested(1,2), Seq(Nested(1,2))) :: Nil)
-      .registerTempTable("nestedRepeatedTest")
+      .toDF.registerTempTable("nestedRepeatedTest")
     assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 8fb5e05..ab53c63 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
 
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index 1e99003..245161d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -111,7 +111,7 @@ class HiveUdfSuite extends QueryTest {
   
   test("UDFIntegerToString") {
     val testData = TestHive.sparkContext.parallelize(
-      IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil)
+      IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF
     testData.registerTempTable("integerTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'")
@@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest {
     val testData = TestHive.sparkContext.parallelize(
       ListListIntCaseClass(Nil) ::
       ListListIntCaseClass(Seq((1, 2, 3))) ::
-      ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil)
+      ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF
     testData.registerTempTable("listListIntTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'")
@@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest {
   test("UDFListString") {
     val testData = TestHive.sparkContext.parallelize(
       ListStringCaseClass(Seq("a", "b", "c")) ::
-      ListStringCaseClass(Seq("d", "e")) :: Nil)
+      ListStringCaseClass(Seq("d", "e")) :: Nil).toDF
     testData.registerTempTable("listStringTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'")
@@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest {
 
   test("UDFStringString") {
     val testData = TestHive.sparkContext.parallelize(
-      StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil)
+      StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF
     testData.registerTempTable("stringTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'")
@@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest {
       ListListIntCaseClass(Nil) ::
       ListListIntCaseClass(Seq((1, 2, 3))) ::
       ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
-      Nil)
+      Nil).toDF
     testData.registerTempTable("TwoListTable")
 
     sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 9a6e865..9788259 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
 import org.apache.spark.sql.hive.HiveShim
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, Row, SQLConf}
 
@@ -34,9 +35,6 @@ case class Nested3(f3: Int)
  */
 class SQLQuerySuite extends QueryTest {
 
-  import org.apache.spark.sql.hive.test.TestHive.implicits._
-  val sqlCtx = TestHive
-
   test("SPARK-4512 Fix attribute reference resolution error when using SORT BY") {
     checkAnswer(
       sql("SELECT * FROM (SELECT key + key AS a FROM src SORT BY value) t ORDER BY t.a"),
@@ -176,7 +174,8 @@ class SQLQuerySuite extends QueryTest {
   }
 
   test("double nested data") {
-    sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil).registerTempTable("nested")
+    sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil)
+      .toDF().registerTempTable("nested")
     checkAnswer(
       sql("SELECT f1.f2.f3 FROM nested"),
       Row(1))
@@ -199,7 +198,7 @@ class SQLQuerySuite extends QueryTest {
   }
 
   test("SPARK-4825 save join to table") {
-    val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
+    val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
     sql("CREATE TABLE test1 (key INT, value STRING)")
     testData.insertInto("test1")
     sql("CREATE TABLE test2 (key INT, value STRING)")
@@ -279,7 +278,7 @@ class SQLQuerySuite extends QueryTest {
 
     val rowRdd = sparkContext.parallelize(row :: Nil)
 
-    sqlCtx.createDataFrame(rowRdd, schema).registerTempTable("testTable")
+    TestHive.createDataFrame(rowRdd, schema).registerTempTable("testTable")
 
     sql(
       """CREATE TABLE nullValuesInInnerComplexTypes

http://git-wip-us.apache.org/repos/asf/spark/blob/ba91bf5f/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index a7479a5..e246cbb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.{SQLConf, QueryTest}
 import org.apache.spark.sql.execution.PhysicalRDD
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -152,7 +154,6 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
   var normalTableDir: File = null
   var partitionedTableDirWithKey: File = null
 
-  import org.apache.spark.sql.hive.test.TestHive.implicits._
 
   override def beforeAll(): Unit = {
     partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -167,12 +168,14 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
       val partDir = new File(partitionedTableDir, s"p=$p")
       sparkContext.makeRDD(1 to 10)
         .map(i => ParquetData(i, s"part-$p"))
+        .toDF()
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
 
     sparkContext
       .makeRDD(1 to 10)
       .map(i => ParquetData(i, s"part-1"))
+      .toDF()
       .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
 
     partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
@@ -183,6 +186,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
       val partDir = new File(partitionedTableDirWithKey, s"p=$p")
       sparkContext.makeRDD(1 to 10)
         .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+        .toDF()
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message