spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-15458][SQL][STREAMING] Disable schema inference for streaming datasets on file streams
Date Tue, 24 May 2016 21:28:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 31fb5fa40 -> 1fb7b3a0a


[SPARK-15458][SQL][STREAMING] Disable schema inference for streaming datasets on file streams

## What changes were proposed in this pull request?

If the user relies on the schema to be inferred in file streams can break easily for multiple
reasons
- accidentally running on a directory which has no data
- schema changing underneath
- on restart, the query will infer schema again, and may unexpectedly infer incorrect schema,
as the file in the directory may be different at the time of the restart.

To avoid these complicated scenarios, for Spark 2.0, we are going to disable schema inferencing
by default with a config, so that user is forced to consider explicitly what is the schema
it wants, rather than the system trying to infer it and run into weird corner cases.

In this PR, I introduce a SQLConf that determines whether schema inference for file streams
is allowed or not. It is disabled by default.

## How was this patch tested?
Updated unit tests that test error behavior with and without schema inference enabled.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13238 from tdas/SPARK-15458.

(cherry picked from commit e631b819fe348729aab062207a452b8f1d1511bd)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fb7b3a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fb7b3a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fb7b3a0

Branch: refs/heads/branch-2.0
Commit: 1fb7b3a0a2e3a5c5f784aab662df93fcc1449c36
Parents: 31fb5fa
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue May 24 14:27:39 2016 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue May 24 14:27:58 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  11 +
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../sql/streaming/FileStreamSourceSuite.scala   | 239 ++++++++++++-------
 3 files changed, 166 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fb7b3a0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e5dd4d8..d0853f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
 import org.apache.spark.util.Utils
@@ -186,6 +187,16 @@ case class DataSource(
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
+        val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
+        val isTextSource = providingClass == classOf[text.DefaultSource]
+        // If the schema inference is disabled, only text sources require schema to be specified
+        if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty)
{
+          throw new IllegalArgumentException(
+            "Schema must be specified when creating a streaming source DataFrame. " +
+              "If some files already exist in the directory, then depending on the file format
" +
+              "you may be able to create a static DataFrame on that directory with " +
+              "'spark.read.load(directory)' and infer schema from it.")
+        }
         SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
 
       case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1fb7b3a0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f3064eb..b91518a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -516,6 +516,13 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(60 * 1000L) // 10 minutes
 
+  val STREAMING_SCHEMA_INFERENCE =
+    SQLConfigBuilder("spark.sql.streaming.schemaInference")
+      .internal()
+      .doc("Whether file-based streaming sources will infer its own schema")
+      .booleanConf
+      .createWithDefault(false)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1fb7b3a0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index c97304c..1d784f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -165,19 +166,36 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
       .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
   }
 
+  // ============= Basic parameter exists tests ================
+
   test("FileStreamSource schema: no path") {
-    val e = intercept[IllegalArgumentException] {
-      createFileStreamSourceAndGetSchema(format = None, path = None, schema = None)
+    def testError(): Unit = {
+      val e = intercept[IllegalArgumentException] {
+        createFileStreamSourceAndGetSchema(format = None, path = None, schema = None)
+      }
+      assert(e.getMessage.contains("path")) // reason is path, not schema
     }
-    assert("'path' is not specified" === e.getMessage)
+    withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { testError() }
+    withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() }
   }
 
-  test("FileStreamSource schema: path doesn't exist") {
-    intercept[AnalysisException] {
+  test("FileStreamSource schema: path doesn't exist, no schema") {
+    val e = intercept[IllegalArgumentException] {
       createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None)
     }
+    assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not
the path
+  }
+
+  test("FileStreamSource schema: path doesn't exist, with schema") {
+    val userSchema = new StructType().add(new StructField("value", IntegerType))
+    val schema = createFileStreamSourceAndGetSchema(
+      format = None, path = Some("/a/b/c"), schema = Some(userSchema))
+    assert(schema === userSchema)
   }
 
+
+  // =============== Text file stream schema tests ================
+
   test("FileStreamSource schema: text, no existing files, no schema") {
     withTempDir { src =>
       val schema = createFileStreamSourceAndGetSchema(
@@ -205,13 +223,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
+  // =============== Parquet file stream schema tests ================
+
   test("FileStreamSource schema: parquet, no existing files, no schema") {
     withTempDir { src =>
-      val e = intercept[AnalysisException] {
-        createFileStreamSourceAndGetSchema(
-          format = Some("parquet"), path = Some(new File(src, "1").getCanonicalPath), schema
= None)
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+        val e = intercept[AnalysisException] {
+          createFileStreamSourceAndGetSchema(
+            format = Some("parquet"),
+            path = Some(new File(src, "1").getCanonicalPath),
+            schema = None)
+        }
+        assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
       }
-      assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
     }
   }
 
@@ -220,9 +244,21 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
       Seq("a", "b", "c").toDS().as("userColumn").toDF().write
         .mode(org.apache.spark.sql.SaveMode.Overwrite)
         .parquet(src.getCanonicalPath)
-      val schema = createFileStreamSourceAndGetSchema(
-        format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
-      assert(schema === new StructType().add("value", StringType))
+
+      // Without schema inference, should throw error
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+        intercept[IllegalArgumentException] {
+          createFileStreamSourceAndGetSchema(
+            format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
+        }
+      }
+
+      // With schema inference, should infer correct schema
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+        val schema = createFileStreamSourceAndGetSchema(
+          format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
+        assert(schema === new StructType().add("value", StringType))
+      }
     }
   }
 
@@ -237,22 +273,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
+  // =============== JSON file stream schema tests ================
+
   test("FileStreamSource schema: json, no existing files, no schema") {
     withTempDir { src =>
-      val e = intercept[AnalysisException] {
-        createFileStreamSourceAndGetSchema(
-          format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+
+        val e = intercept[AnalysisException] {
+          createFileStreamSourceAndGetSchema(
+            format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+        }
+        assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
       }
-      assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
     }
   }
 
   test("FileStreamSource schema: json, existing files, no schema") {
     withTempDir { src =>
-      stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}")
-      val schema = createFileStreamSourceAndGetSchema(
-        format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
-      assert(schema === new StructType().add("c", StringType))
+
+      // Without schema inference, should throw error
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+        intercept[IllegalArgumentException] {
+          createFileStreamSourceAndGetSchema(
+            format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+        }
+      }
+
+      // With schema inference, should infer correct schema
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+        stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}")
+        val schema = createFileStreamSourceAndGetSchema(
+          format = Some("json"), path = Some(src.getCanonicalPath), schema = None)
+        assert(schema === new StructType().add("c", StringType))
+      }
     }
   }
 
@@ -266,6 +319,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
+  // =============== Text file stream tests ================
+
   test("read from text files") {
     withTempDirs { case (src, tmp) =>
       val textStream = createFileStream("text", src.getCanonicalPath)
@@ -284,6 +339,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
+  // =============== JSON file stream tests ================
+
   test("read from json files") {
     withTempDirs { case (src, tmp) =>
       val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))
@@ -313,74 +370,82 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
 
   test("read from json files with inferring schema") {
     withTempDirs { case (src, tmp) =>
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
 
-      // Add a file so that we can infer its schema
-      stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+        // Add a file so that we can infer its schema
+        stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
 
-      val fileStream = createFileStream("json", src.getCanonicalPath)
-      assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
+        val fileStream = createFileStream("json", src.getCanonicalPath)
+        assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
 
-      // FileStreamSource should infer the column "c"
-      val filtered = fileStream.filter($"c" contains "keep")
+        // FileStreamSource should infer the column "c"
+        val filtered = fileStream.filter($"c" contains "keep")
 
-      testStream(filtered)(
-        AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
-        CheckAnswer("keep2", "keep3", "keep5", "keep6")
-      )
+        testStream(filtered)(
+          AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+          CheckAnswer("keep2", "keep3", "keep5", "keep6")
+        )
+      }
     }
   }
 
   test("reading from json files inside partitioned directory") {
     withTempDirs { case (baseSrc, tmp) =>
-      val src = new File(baseSrc, "type=X")
-      src.mkdirs()
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+        val src = new File(baseSrc, "type=X")
+        src.mkdirs()
 
-      // Add a file so that we can infer its schema
-      stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+        // Add a file so that we can infer its schema
+        stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
 
-      val fileStream = createFileStream("json", src.getCanonicalPath)
+        val fileStream = createFileStream("json", src.getCanonicalPath)
 
-      // FileStreamSource should infer the column "c"
-      val filtered = fileStream.filter($"c" contains "keep")
+        // FileStreamSource should infer the column "c"
+        val filtered = fileStream.filter($"c" contains "keep")
 
-      testStream(filtered)(
-        AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
-        CheckAnswer("keep2", "keep3", "keep5", "keep6")
-      )
+        testStream(filtered)(
+          AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+          CheckAnswer("keep2", "keep3", "keep5", "keep6")
+        )
+      }
     }
   }
 
   test("reading from json files with changing schema") {
     withTempDirs { case (src, tmp) =>
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
 
-      // Add a file so that we can infer its schema
-      stringToFile(new File(src, "existing"), "{'k': 'value0'}")
+        // Add a file so that we can infer its schema
+        stringToFile(new File(src, "existing"), "{'k': 'value0'}")
 
-      val fileStream = createFileStream("json", src.getCanonicalPath)
+        val fileStream = createFileStream("json", src.getCanonicalPath)
 
-      // FileStreamSource should infer the column "k"
-      assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+        // FileStreamSource should infer the column "k"
+        assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
 
-      // After creating DF and before starting stream, add data with different schema
-      // Should not affect the inferred schema any more
-      stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
+        // After creating DF and before starting stream, add data with different schema
+        // Should not affect the inferred schema any more
+        stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
 
-      testStream(fileStream)(
+        testStream(fileStream)(
 
-        // Should not pick up column v in the file added before start
-        AddTextFileData("{'k': 'value2'}", src, tmp),
-        CheckAnswer("value0", "value1", "value2"),
+          // Should not pick up column v in the file added before start
+          AddTextFileData("{'k': 'value2'}", src, tmp),
+          CheckAnswer("value0", "value1", "value2"),
 
-        // Should read data in column k, and ignore v
-        AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
-        CheckAnswer("value0", "value1", "value2", "value3"),
+          // Should read data in column k, and ignore v
+          AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
+          CheckAnswer("value0", "value1", "value2", "value3"),
 
-        // Should ignore rows that do not have the necessary k column
-        AddTextFileData("{'v': 'value4'}", src, tmp),
-        CheckAnswer("value0", "value1", "value2", "value3", null))
+          // Should ignore rows that do not have the necessary k column
+          AddTextFileData("{'v': 'value4'}", src, tmp),
+          CheckAnswer("value0", "value1", "value2", "value3", null))
+      }
     }
   }
 
+  // =============== Parquet file stream tests ================
+
   test("read from parquet files") {
     withTempDirs { case (src, tmp) =>
       val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema))
@@ -402,49 +467,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
   test("read from parquet files with changing schema") {
 
     withTempDirs { case (src, tmp) =>
-      // Add a file so that we can infer its schema
-      AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
-
-      val fileStream = createFileStream("parquet", src.getCanonicalPath)
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
 
-      // FileStreamSource should infer the column "k"
-      assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+        // Add a file so that we can infer its schema
+        AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
 
-      // After creating DF and before starting stream, add data with different schema
-      // Should not affect the inferred schema any more
-      AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
+        val fileStream = createFileStream("parquet", src.getCanonicalPath)
 
-      testStream(fileStream)(
-        // Should not pick up column v in the file added before start
-        AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
-        CheckAnswer("value0", "value1", "value2"),
+        // FileStreamSource should infer the column "k"
+        assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
 
-        // Should read data in column k, and ignore v
-        AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
-        CheckAnswer("value0", "value1", "value2", "value3"),
+        // After creating DF and before starting stream, add data with different schema
+        // Should not affect the inferred schema any more
+        AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
 
-        // Should ignore rows that do not have the necessary k column
-        AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
-        CheckAnswer("value0", "value1", "value2", "value3", null)
-      )
-    }
-  }
+        testStream(fileStream)(
+          // Should not pick up column v in the file added before start
+          AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
+          CheckAnswer("value0", "value1", "value2"),
 
-  test("file stream source without schema") {
-    withTempDir { src =>
-      // Only "text" doesn't need a schema
-      createFileStream("text", src.getCanonicalPath)
+          // Should read data in column k, and ignore v
+          AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+          CheckAnswer("value0", "value1", "value2", "value3"),
 
-      // Both "json" and "parquet" require a schema if no existing file to infer
-      intercept[AnalysisException] {
-        createFileStream("json", src.getCanonicalPath)
-      }
-      intercept[AnalysisException] {
-        createFileStream("parquet", src.getCanonicalPath)
+          // Should ignore rows that do not have the necessary k column
+          AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
+          CheckAnswer("value0", "value1", "value2", "value3", null)
+        )
       }
     }
   }
 
+  // =============== file stream globbing tests ================
+
   test("read new files in nested directories with globbing") {
     withTempDirs { case (dir, tmp) =>
 
@@ -518,6 +573,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext
{
     }
   }
 
+  // =============== other tests ================
+
   test("fault tolerance") {
     withTempDirs { case (src, tmp) =>
       val fileStream = createFileStream("text", src.getCanonicalPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message