spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hol...@apache.org
Subject spark git commit: [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc
Date Thu, 09 Mar 2017 19:44:39 GMT
Repository: spark
Updated Branches:
  refs/heads/master 30b18e693 -> cabe1df86


[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc

Beside the issue in spark api, also fix 2 minor issues in pyspark
- support read from multiple input paths for orc
- support read from multiple input paths for text

Author: Jeff Zhang <zjffdu@apache.org>

Closes #10307 from zjffdu/SPARK-12334.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cabe1df8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cabe1df8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cabe1df8

Branch: refs/heads/master
Commit: cabe1df8606e7e5b9e6efb106045deb3f39f5f13
Parents: 30b18e6
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Mar 9 11:44:34 2017 -0800
Committer: Holden Karau <holden@us.ibm.com>
Committed: Thu Mar 9 11:44:34 2017 -0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                      | 14 ++++++++------
 python/pyspark/sql/tests.py                           |  5 +++++
 .../scala/org/apache/spark/sql/DataFrameReader.scala  |  6 +++---
 .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala |  9 +++++++++
 4 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 45fb9b7..4354345 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -161,7 +161,7 @@ class DataFrameReader(OptionUtils):
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
              timeZone=None, wholeFile=None):
         """
-        Loads a JSON file and returns the results as a :class:`DataFrame`.
+        Loads JSON files and returns the results as a :class:`DataFrame`.
 
         `JSON Lines <http://jsonlines.org/>`_(newline-delimited JSON) is supported
by default.
         For JSON (one record per file), set the `wholeFile` parameter to ``true``.
@@ -169,7 +169,7 @@ class DataFrameReader(OptionUtils):
         If the ``schema`` parameter is not specified, this function goes
         through the input once to determine the input schema.
 
-        :param path: string represents path to the JSON dataset,
+        :param path: string represents path to the JSON dataset, or a list of paths,
                      or RDD of Strings storing JSON objects.
         :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
         :param primitivesAsString: infers all primitive values as a string type. If None
is set,
@@ -252,7 +252,7 @@ class DataFrameReader(OptionUtils):
             jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
             return self._df(self._jreader.json(jrdd))
         else:
-            raise TypeError("path can be only string or RDD")
+            raise TypeError("path can be only string, list or RDD")
 
     @since(1.4)
     def table(self, tableName):
@@ -269,7 +269,7 @@ class DataFrameReader(OptionUtils):
 
     @since(1.4)
     def parquet(self, *paths):
-        """Loads a Parquet file, returning the result as a :class:`DataFrame`.
+        """Loads Parquet files, returning the result as a :class:`DataFrame`.
 
         You can set the following Parquet-specific option(s) for reading Parquet files:
             * ``mergeSchema``: sets whether we should merge schemas collected from all \
@@ -407,7 +407,7 @@ class DataFrameReader(OptionUtils):
 
     @since(1.5)
     def orc(self, path):
-        """Loads an ORC file, returning the result as a :class:`DataFrame`.
+        """Loads ORC files, returning the result as a :class:`DataFrame`.
 
         .. note:: Currently ORC support is only available together with Hive support.
 
@@ -415,7 +415,9 @@ class DataFrameReader(OptionUtils):
         >>> df.dtypes
         [('a', 'bigint'), ('b', 'int'), ('c', 'int')]
         """
-        return self._df(self._jreader.orc(path))
+        if isinstance(path, basestring):
+            path = [path]
+        return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
 
     @since(1.4)
     def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,

http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1b873e9..f0a9a04 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -450,6 +450,11 @@ class SQLTests(ReusedPySparkTestCase):
                     Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
         self.assertEqual(ages_newlines.collect(), expected)
 
+    def test_read_multiple_orc_file(self):
+        df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0",
+                                  "python/test_support/sql/orc_partitioned/b=1/c=1"])
+        self.assertEqual(2, df.count())
+
     def test_udf_with_input_file_name(self):
         from pyspark.sql.functions import udf, input_file_name
         from pyspark.sql.types import StringType

http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index a5e38e2..4f4cc93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -262,7 +262,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
   }
 
   /**
-   * Loads a JSON file and returns the results as a `DataFrame`.
+   * Loads JSON files and returns the results as a `DataFrame`.
    *
    * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON)
is supported by
    * default. For JSON (one record per file), set the `wholeFile` option to true.
@@ -438,7 +438,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
   }
 
   /**
-   * Loads a CSV file and returns the result as a `DataFrame`.
+   * Loads CSV files and returns the result as a `DataFrame`.
    *
    * This function will go through the input once to determine the input schema if `inferSchema`
    * is enabled. To avoid going through the entire data once, disable `inferSchema` option
or
@@ -549,7 +549,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends
Logging {
   }
 
   /**
-   * Loads an ORC file and returns the result as a `DataFrame`.
+   * Loads ORC files and returns the result as a `DataFrame`.
    *
    * @param paths input paths
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 38a5477..5d8ba9d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
 
 case class AllDataTypesWithNonPrimitiveType(
     stringField: String,
@@ -611,4 +612,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest
{
       }
     }
   }
+
+   test("read from multiple orc input paths") {
+     val path1 = Utils.createTempDir()
+     val path2 = Utils.createTempDir()
+     makeOrcFile((1 to 10).map(Tuple1.apply), path1)
+     makeOrcFile((1 to 10).map(Tuple1.apply), path2)
+     assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count())
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message