spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-20431][SS][FOLLOWUP] Specify a schema by using a DDL-formatted string in DataStreamReader
Date Sat, 24 Jun 2017 03:39:47 GMT
Repository: spark
Updated Branches:
  refs/heads/master 03eb6117a -> 7525ce98b


[SPARK-20431][SS][FOLLOWUP] Specify a schema by using a DDL-formatted string in DataStreamReader

## What changes were proposed in this pull request?

This pr supported a DDL-formatted string in `DataStreamReader.schema`.
This fix could make users easily define a schema without importing the type classes.

For example,

```scala
scala> spark.readStream.schema("col0 INT, col1 DOUBLE").load("/tmp/abc").printSchema()
root
 |-- col0: integer (nullable = true)
 |-- col1: double (nullable = true)
```

## How was this patch tested?

Added tests in `DataStreamReaderWriterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18373 from HyukjinKwon/SPARK-20431.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7525ce98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7525ce98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7525ce98

Branch: refs/heads/master
Commit: 7525ce98b4575b1ac4e44cc9b3a5773f03eba19e
Parents: 03eb611
Author: hyukjinkwon <gurwls223@gmail.com>
Authored: Sat Jun 24 11:39:41 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Sat Jun 24 11:39:41 2017 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  2 ++
 python/pyspark/sql/streaming.py                 | 24 +++++++++++++-------
 .../spark/sql/streaming/DataStreamReader.scala  | 12 ++++++++++
 .../test/DataStreamReaderWriterSuite.scala      | 12 ++++++++++
 4 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7525ce98/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index aef71f9..7279173 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -98,6 +98,8 @@ class DataFrameReader(OptionUtils):
 
         :param schema: a :class:`pyspark.sql.types.StructType` object or a DDL-formatted
string
                        (For example ``col0 INT, col1 DOUBLE``).
+
+        >>> s = spark.read.schema("col0 INT, col1 DOUBLE")
         """
         from pyspark.sql import SparkSession
         spark = SparkSession.builder.getOrCreate()

http://git-wip-us.apache.org/repos/asf/spark/blob/7525ce98/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 58aa246..5bbd70c 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -319,16 +319,21 @@ class DataStreamReader(OptionUtils):
 
         .. note:: Evolving.
 
-        :param schema: a :class:`pyspark.sql.types.StructType` object
+        :param schema: a :class:`pyspark.sql.types.StructType` object or a DDL-formatted
string
+                       (For example ``col0 INT, col1 DOUBLE``).
 
         >>> s = spark.readStream.schema(sdf_schema)
+        >>> s = spark.readStream.schema("col0 INT, col1 DOUBLE")
         """
         from pyspark.sql import SparkSession
-        if not isinstance(schema, StructType):
-            raise TypeError("schema should be StructType")
         spark = SparkSession.builder.getOrCreate()
-        jschema = spark._jsparkSession.parseDataType(schema.json())
-        self._jreader = self._jreader.schema(jschema)
+        if isinstance(schema, StructType):
+            jschema = spark._jsparkSession.parseDataType(schema.json())
+            self._jreader = self._jreader.schema(jschema)
+        elif isinstance(schema, basestring):
+            self._jreader = self._jreader.schema(schema)
+        else:
+            raise TypeError("schema should be StructType or string")
         return self
 
     @since(2.0)
@@ -372,7 +377,8 @@ class DataStreamReader(OptionUtils):
 
         :param path: optional string for file-system backed data sources.
         :param format: optional string for format of the data source. Default to 'parquet'.
-        :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema.
+        :param schema: optional :class:`pyspark.sql.types.StructType` for the input schema
+                       or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
         :param options: all other string options
 
         >>> json_sdf = spark.readStream.format("json") \\
@@ -415,7 +421,8 @@ class DataStreamReader(OptionUtils):
 
         :param path: string represents path to the JSON dataset,
                      or RDD of Strings storing JSON objects.
-        :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
+        :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
+                       or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
         :param primitivesAsString: infers all primitive values as a string type. If None
is set,
                                    it uses the default value, ``false``.
         :param prefersDecimal: infers all floating-point values as a decimal type. If the
values
@@ -542,7 +549,8 @@ class DataStreamReader(OptionUtils):
         .. note:: Evolving.
 
         :param path: string, or list of strings, for input path(s).
-        :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
+        :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
+                       or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
         :param sep: sets the single character as a separator for each field and value.
                     If None is set, it uses the default value, ``,``.
         :param encoding: decodes the CSV files by the given encoding type. If None is set,

http://git-wip-us.apache.org/repos/asf/spark/blob/7525ce98/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 7e8e639..70ddfa8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -60,6 +60,18 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends
Lo
   }
 
   /**
+   * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g.
JSON) can
+   * infer the input schema automatically from data. By specifying the schema here, the underlying
+   * data source can skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 2.3.0
+   */
+  def schema(schemaString: String): DataStreamReader = {
+    this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))
+    this
+  }
+
+  /**
    * Adds an input option for the underlying data source.
    *
    * You can set the following option(s):

http://git-wip-us.apache.org/repos/asf/spark/blob/7525ce98/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index b5f1e28..3de0ae6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -663,4 +663,16 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter
{
     }
     assert(fs.exists(checkpointDir))
   }
+
+  test("SPARK-20431: Specify a schema by using a DDL-formatted string") {
+    spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .schema("aa INT")
+      .load()
+
+    assert(LastOptions.schema.isDefined)
+    assert(LastOptions.schema.get === StructType(StructField("aa", IntegerType) :: Nil))
+
+    LastOptions.clear()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message