spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-13895][SQL] DataFrameReader.text should return Dataset[String]
Date Tue, 15 Mar 2016 21:58:00 GMT
Repository: spark
Updated Branches:
  refs/heads/master 41eaabf59 -> 643649dcb


[SPARK-13895][SQL] DataFrameReader.text should return Dataset[String]

## What changes were proposed in this pull request?
This patch changes DataFrameReader.text()'s return type from DataFrame to Dataset[String].

Closes #11731.

## How was this patch tested?
Updated existing integration tests to reflect the change.

Author: Reynold Xin <rxin@databricks.com>

Closes #11739 from rxin/SPARK-13895.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/643649dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/643649dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/643649dc

Branch: refs/heads/master
Commit: 643649dcbfabc5d6952c2ecfb98286324c887665
Parents: 41eaabf
Author: Reynold Xin <rxin@databricks.com>
Authored: Tue Mar 15 14:57:54 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Tue Mar 15 14:57:54 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrameReader.scala    | 12 ++++++++----
 .../test/org/apache/spark/sql/JavaDataFrameSuite.java   |  8 ++++----
 .../sql/execution/datasources/text/TextSuite.scala      |  8 ++++----
 3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/643649dc/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 57c978b..ef85f1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -399,8 +399,10 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging
{
   }
 
   /**
-   * Loads a text file and returns a [[DataFrame]] with a single string column named "value".
-   * Each line in the text file is a new row in the resulting DataFrame. For example:
+   * Loads a text file and returns a [[Dataset]] of String. The underlying schema of the
Dataset
+   * contains a single string column named "value".
+   *
+   * Each line in the text file is a new row in the resulting Dataset. For example:
    * {{{
    *   // Scala:
    *   sqlContext.read.text("/path/to/spark/README.md")
@@ -410,10 +412,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging
{
    * }}}
    *
    * @param paths input path
-   * @since 1.6.0
+   * @since 2.0.0
    */
   @scala.annotation.varargs
-  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+  def text(paths: String*): Dataset[String] = {
+    format("text").load(paths : _*).as[String](sqlContext.implicits.newStringEncoder)
+  }
 
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options

http://git-wip-us.apache.org/repos/asf/spark/blob/643649dc/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 4255472..7fe17e0 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -316,14 +316,14 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testTextLoad() {
-    Dataset<Row> df1 = context.read().text(
+    Dataset<String> ds1 = context.read().text(
       Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
-    Assert.assertEquals(4L, df1.count());
+    Assert.assertEquals(4L, ds1.count());
 
-    Dataset<Row> df2 = context.read().text(
+    Dataset<String> ds2 = context.read().text(
       Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
       Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
-    Assert.assertEquals(5L, df2.count());
+    Assert.assertEquals(5L, ds2.count());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/643649dc/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index ee39872..47330f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -37,7 +37,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SQLContext.read.text() API") {
-    verifyFrame(sqlContext.read.text(testFile))
+    verifyFrame(sqlContext.read.text(testFile).toDF())
   }
 
   test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@@ -46,7 +46,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
     val tempFile = Utils.createTempDir()
     tempFile.delete()
     df.write.text(tempFile.getCanonicalPath)
-    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath))
+    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath).toDF())
 
     Utils.deleteRecursively(tempFile)
   }
@@ -75,7 +75,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
-        verifyFrame(sqlContext.read.text(tempDirPath))
+        verifyFrame(sqlContext.read.text(tempDirPath).toDF())
     }
 
     val errMsg = intercept[IllegalArgumentException] {
@@ -103,7 +103,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
-        verifyFrame(sqlContext.read.text(tempDirPath))
+        verifyFrame(sqlContext.read.text(tempDirPath).toDF())
       } finally {
         // Hadoop 1 doesn't have `Configuration.unset`
         hadoopConfiguration.clear()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message