spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-9100] [SQL] Adds DataFrame reader/writer shortcut methods for ORC
Date Tue, 21 Jul 2015 07:08:58 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1ddd0f2f1 -> d38c5029a


[SPARK-9100] [SQL] Adds DataFrame reader/writer shortcut methods for ORC

This PR adds DataFrame reader/writer shortcut methods for ORC in both Scala and Python.

Author: Cheng Lian <lian@databricks.com>

Closes #7444 from liancheng/spark-9100 and squashes the following commits:

284d043 [Cheng Lian] Fixes PySpark test cases and addresses PR comments
e0b09fb [Cheng Lian] Adds DataFrame reader/writer shortcut methods for ORC


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d38c5029
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d38c5029
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d38c5029

Branch: refs/heads/master
Commit: d38c5029a2ca845e2782096044a6412b653c9f95
Parents: 1ddd0f2
Author: Cheng Lian <lian@databricks.com>
Authored: Tue Jul 21 15:08:44 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Tue Jul 21 15:08:44 2015 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  44 +++++++++++++++++--
 .../sql/orc_partitioned/._SUCCESS.crc           | Bin 0 -> 8 bytes
 .../test_support/sql/orc_partitioned/_SUCCESS   |   0
 ...829af031-b970-49d6-ad39-30460a0be2c8.orc.crc | Bin 0 -> 12 bytes
 ...000-829af031-b970-49d6-ad39-30460a0be2c8.orc | Bin 0 -> 168 bytes
 ...829af031-b970-49d6-ad39-30460a0be2c8.orc.crc | Bin 0 -> 12 bytes
 ...000-829af031-b970-49d6-ad39-30460a0be2c8.orc | Bin 0 -> 168 bytes
 .../org/apache/spark/sql/DataFrameReader.scala  |   9 ++++
 .../org/apache/spark/sql/DataFrameWriter.scala  |  12 +++++
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |   3 +-
 .../hive/orc/OrcPartitionDiscoverySuite.scala   |  14 +++---
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  12 ++---
 .../org/apache/spark/sql/hive/orc/OrcTest.scala |   8 ++--
 13 files changed, 79 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 882a030..dea8bad 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -146,14 +146,28 @@ class DataFrameReader(object):
         return self._df(self._jreader.table(tableName))
 
     @since(1.4)
-    def parquet(self, *path):
+    def parquet(self, *paths):
         """Loads a Parquet file, returning the result as a :class:`DataFrame`.
 
         >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
         >>> df.dtypes
         [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
         """
-        return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
+        return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, paths)))
+
+    @since(1.5)
+    def orc(self, path):
+        """
+        Loads an ORC file, returning the result as a :class:`DataFrame`.
+
+        ::Note: Currently ORC support is only available together with
+        :class:`HiveContext`.
+
+        >>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
+        >>> df.dtypes
+        [('a', 'bigint'), ('b', 'int'), ('c', 'int')]
+        """
+        return self._df(self._jreader.orc(path))
 
     @since(1.4)
     def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
@@ -378,6 +392,29 @@ class DataFrameWriter(object):
             self.partitionBy(partitionBy)
         self._jwrite.parquet(path)
 
+    def orc(self, path, mode=None, partitionBy=None):
+        """Saves the content of the :class:`DataFrame` in ORC format at the specified path.
+
+        ::Note: Currently ORC support is only available together with
+        :class:`HiveContext`.
+
+        :param path: the path in any Hadoop supported file system
+        :param mode: specifies the behavior of the save operation when data already exists.
+
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
+        :param partitionBy: names of partitioning columns
+
+        >>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
+        >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
+        """
+        self.mode(mode)
+        if partitionBy is not None:
+            self.partitionBy(partitionBy)
+        self._jwrite.orc(path)
+
     @since(1.4)
     def jdbc(self, url, table, mode=None, properties={}):
         """Saves the content of the :class:`DataFrame` to a external database table via JDBC.
@@ -408,7 +445,7 @@ def _test():
     import os
     import tempfile
     from pyspark.context import SparkContext
-    from pyspark.sql import Row, SQLContext
+    from pyspark.sql import Row, SQLContext, HiveContext
     import pyspark.sql.readwriter
 
     os.chdir(os.environ["SPARK_HOME"])
@@ -420,6 +457,7 @@ def _test():
     globs['os'] = os
     globs['sc'] = sc
     globs['sqlContext'] = SQLContext(sc)
+    globs['hiveContext'] = HiveContext(sc)
     globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
 
     (failure_count, test_count) = doctest.testmod(

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/test_support/sql/orc_partitioned/._SUCCESS.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/orc_partitioned/._SUCCESS.crc b/python/test_support/sql/orc_partitioned/._SUCCESS.crc
new file mode 100644
index 0000000..3b7b044
Binary files /dev/null and b/python/test_support/sql/orc_partitioned/._SUCCESS.crc differ

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/test_support/sql/orc_partitioned/_SUCCESS
----------------------------------------------------------------------
diff --git a/python/test_support/sql/orc_partitioned/_SUCCESS b/python/test_support/sql/orc_partitioned/_SUCCESS
new file mode 100755
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
b/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
new file mode 100644
index 0000000..834cf0b
Binary files /dev/null and b/python/test_support/sql/orc_partitioned/b=0/c=0/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
differ

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
b/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
new file mode 100755
index 0000000..4943801
Binary files /dev/null and b/python/test_support/sql/orc_partitioned/b=0/c=0/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
differ

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
b/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
new file mode 100644
index 0000000..693dcee
Binary files /dev/null and b/python/test_support/sql/orc_partitioned/b=1/c=1/.part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc.crc
differ

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
b/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
new file mode 100755
index 0000000..4cbb95a
Binary files /dev/null and b/python/test_support/sql/orc_partitioned/b=1/c=1/part-r-00000-829af031-b970-49d6-ad39-30460a0be2c8.orc
differ

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0e37ad3..f1c1ddf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -265,6 +265,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging
{
   }
 
   /**
+   * Loads an ORC file and returns the result as a [[DataFrame]].
+   *
+   * @param path input path
+   * @since 1.5.0
+   * @note Currently, this method can only be used together with `HiveContext`.
+   */
+  def orc(path: String): DataFrame = format("orc").load(path)
+
+  /**
    * Returns the specified table as a [[DataFrame]].
    *
    * @since 1.4.0

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5548b26..3e7b9cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -280,6 +280,18 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    */
   def parquet(path: String): Unit = format("parquet").save(path)
 
+  /**
+   * Saves the content of the [[DataFrame]] in ORC format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("orc").save(path)
+   * }}}
+   *
+   * @since 1.5.0
+   * @note Currently, this method can only be used together with `HiveContext`.
+   */
+  def orc(path: String): Unit = format("orc").save(path)
+
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   ///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 080af5b..af3f468 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -41,8 +41,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
           .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
           .toDF("a", "b", "p1")
           .write
-          .format("orc")
-          .save(partitionDir.toString)
+          .orc(partitionDir.toString)
       }
 
       val dataSchemaWithPartition =

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
index 3c2efe3..d463e8f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala
@@ -49,13 +49,13 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll
{
 
   def makeOrcFile[T <: Product: ClassTag: TypeTag](
       data: Seq[T], path: File): Unit = {
-    data.toDF().write.format("orc").mode("overwrite").save(path.getCanonicalPath)
+    data.toDF().write.mode("overwrite").orc(path.getCanonicalPath)
   }
 
 
   def makeOrcFile[T <: Product: ClassTag: TypeTag](
       df: DataFrame, path: File): Unit = {
-    df.write.format("orc").mode("overwrite").save(path.getCanonicalPath)
+    df.write.mode("overwrite").orc(path.getCanonicalPath)
   }
 
   protected def withTempTable(tableName: String)(f: => Unit): Unit = {
@@ -90,7 +90,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll
{
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
-      read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
+      read.orc(base.getCanonicalPath).registerTempTable("t")
 
       withTempTable("t") {
         checkAnswer(
@@ -137,7 +137,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll
{
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
-      read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
+      read.orc(base.getCanonicalPath).registerTempTable("t")
 
       withTempTable("t") {
         checkAnswer(
@@ -187,9 +187,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll
{
       }
 
       read
-        .format("orc")
         .option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
-        .load(base.getCanonicalPath)
+        .orc(base.getCanonicalPath)
         .registerTempTable("t")
 
       withTempTable("t") {
@@ -230,9 +229,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll
{
       }
 
       read
-        .format("orc")
         .option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
-        .load(base.getCanonicalPath)
+        .orc(base.getCanonicalPath)
         .registerTempTable("t")
 
       withTempTable("t") {

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index ca131fa..744d462 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -63,14 +63,14 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
 
     withOrcFile(data) { file =>
       checkAnswer(
-        sqlContext.read.format("orc").load(file),
+        sqlContext.read.orc(file),
         data.toDF().collect())
     }
   }
 
   test("Read/write binary data") {
     withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file =>
-      val bytes = read.format("orc").load(file).head().getAs[Array[Byte]](0)
+      val bytes = read.orc(file).head().getAs[Array[Byte]](0)
       assert(new String(bytes, "utf8") === "test")
     }
   }
@@ -88,7 +88,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
 
     withOrcFile(data) { file =>
       checkAnswer(
-        read.format("orc").load(file),
+        read.orc(file),
         data.toDF().collect())
     }
   }
@@ -158,7 +158,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
 
     withOrcFile(data) { file =>
       checkAnswer(
-        read.format("orc").load(file),
+        read.orc(file),
         Row(Seq.fill(5)(null): _*))
     }
   }
@@ -310,7 +310,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
              """.stripMargin)
 
           val errorMessage = intercept[AnalysisException] {
-            sqlContext.read.format("orc").load(path)
+            sqlContext.read.orc(path)
           }.getMessage
 
           assert(errorMessage.contains("Failed to discover schema from ORC files"))
@@ -323,7 +323,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
                |SELECT key, value FROM single
              """.stripMargin)
 
-          val df = sqlContext.read.format("orc").load(path)
+          val df = sqlContext.read.orc(path)
           assert(df.schema === singleRowDF.schema.asNullable)
           checkAnswer(df, singleRowDF)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/d38c5029/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
index 5daf691..9d76d65 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
@@ -39,7 +39,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
       (data: Seq[T])
       (f: String => Unit): Unit = {
     withTempPath { file =>
-      sparkContext.parallelize(data).toDF().write.format("orc").save(file.getCanonicalPath)
+      sparkContext.parallelize(data).toDF().write.orc(file.getCanonicalPath)
       f(file.getCanonicalPath)
     }
   }
@@ -51,7 +51,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
   protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
       (data: Seq[T])
       (f: DataFrame => Unit): Unit = {
-    withOrcFile(data)(path => f(sqlContext.read.format("orc").load(path)))
+    withOrcFile(data)(path => f(sqlContext.read.orc(path)))
   }
 
   /**
@@ -70,11 +70,11 @@ private[sql] trait OrcTest extends SQLTestUtils {
 
   protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
       data: Seq[T], path: File): Unit = {
-    data.toDF().write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
+    data.toDF().write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
   }
 
   protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
       df: DataFrame, path: File): Unit = {
-    df.write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
+    df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message