Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0E70E200B31 for ; Tue, 24 May 2016 23:28:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0D077160A35; Tue, 24 May 2016 21:28:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D214016098E for ; Tue, 24 May 2016 23:28:03 +0200 (CEST) Received: (qmail 50975 invoked by uid 500); 24 May 2016 21:28:03 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 50966 invoked by uid 99); 24 May 2016 21:28:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 21:28:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DCDCDDFBED; Tue, 24 May 2016 21:28:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tdas@apache.org To: commits@spark.apache.org Message-Id: <4890f9acb6524170b0647021e071826e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-15458][SQL][STREAMING] Disable schema inference for streaming datasets on file streams Date: Tue, 24 May 2016 21:28:02 +0000 (UTC) archived-at: Tue, 24 May 2016 21:28:05 -0000 Repository: spark Updated Branches: refs/heads/branch-2.0 31fb5fa40 -> 1fb7b3a0a [SPARK-15458][SQL][STREAMING] Disable schema inference for streaming datasets on file streams ## What changes were proposed in this pull request? If the user relies on the schema to be inferred in file streams can break easily for multiple reasons - accidentally running on a directory which has no data - schema changing underneath - on restart, the query will infer schema again, and may unexpectedly infer incorrect schema, as the file in the directory may be different at the time of the restart. To avoid these complicated scenarios, for Spark 2.0, we are going to disable schema inferencing by default with a config, so that user is forced to consider explicitly what is the schema it wants, rather than the system trying to infer it and run into weird corner cases. In this PR, I introduce a SQLConf that determines whether schema inference for file streams is allowed or not. It is disabled by default. ## How was this patch tested? Updated unit tests that test error behavior with and without schema inference enabled. Author: Tathagata Das Closes #13238 from tdas/SPARK-15458. (cherry picked from commit e631b819fe348729aab062207a452b8f1d1511bd) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fb7b3a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fb7b3a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fb7b3a0 Branch: refs/heads/branch-2.0 Commit: 1fb7b3a0a2e3a5c5f784aab662df93fcc1449c36 Parents: 31fb5fa Author: Tathagata Das Authored: Tue May 24 14:27:39 2016 -0700 Committer: Tathagata Das Committed: Tue May 24 14:27:58 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/datasources/DataSource.scala | 11 + .../org/apache/spark/sql/internal/SQLConf.scala | 7 + .../sql/streaming/FileStreamSourceSuite.scala | 239 ++++++++++++------- 3 files changed, 166 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1fb7b3a0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e5dd4d8..d0853f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils @@ -186,6 +187,16 @@ case class DataSource( val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) + val isTextSource = providingClass == classOf[text.DefaultSource] + // If the schema inference is disabled, only text sources require schema to be specified + if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { + throw new IllegalArgumentException( + "Schema must be specified when creating a streaming source DataFrame. " + + "If some files already exist in the directory, then depending on the file format " + + "you may be able to create a static DataFrame on that directory with " + + "'spark.read.load(directory)' and infer schema from it.") + } SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format)) case _ => http://git-wip-us.apache.org/repos/asf/spark/blob/1fb7b3a0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f3064eb..b91518a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -516,6 +516,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(60 * 1000L) // 10 minutes + val STREAMING_SCHEMA_INFERENCE = + SQLConfigBuilder("spark.sql.streaming.schemaInference") + .internal() + .doc("Whether file-based streaming sources will infer its own schema") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } http://git-wip-us.apache.org/repos/asf/spark/blob/1fb7b3a0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index c97304c..1d784f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -23,6 +23,7 @@ import java.util.UUID import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -165,19 +166,36 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head } + // ============= Basic parameter exists tests ================ + test("FileStreamSource schema: no path") { - val e = intercept[IllegalArgumentException] { - createFileStreamSourceAndGetSchema(format = None, path = None, schema = None) + def testError(): Unit = { + val e = intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema(format = None, path = None, schema = None) + } + assert(e.getMessage.contains("path")) // reason is path, not schema } - assert("'path' is not specified" === e.getMessage) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { testError() } + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() } } - test("FileStreamSource schema: path doesn't exist") { - intercept[AnalysisException] { + test("FileStreamSource schema: path doesn't exist, no schema") { + val e = intercept[IllegalArgumentException] { createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None) } + assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path + } + + test("FileStreamSource schema: path doesn't exist, with schema") { + val userSchema = new StructType().add(new StructField("value", IntegerType)) + val schema = createFileStreamSourceAndGetSchema( + format = None, path = Some("/a/b/c"), schema = Some(userSchema)) + assert(schema === userSchema) } + + // =============== Text file stream schema tests ================ + test("FileStreamSource schema: text, no existing files, no schema") { withTempDir { src => val schema = createFileStreamSourceAndGetSchema( @@ -205,13 +223,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== Parquet file stream schema tests ================ + test("FileStreamSource schema: parquet, no existing files, no schema") { withTempDir { src => - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("parquet"), path = Some(new File(src, "1").getCanonicalPath), schema = None) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val e = intercept[AnalysisException] { + createFileStreamSourceAndGetSchema( + format = Some("parquet"), + path = Some(new File(src, "1").getCanonicalPath), + schema = None) + } + assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } } @@ -220,9 +244,21 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Seq("a", "b", "c").toDS().as("userColumn").toDF().write .mode(org.apache.spark.sql.SaveMode.Overwrite) .parquet(src.getCanonicalPath) - val schema = createFileStreamSourceAndGetSchema( - format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) - assert(schema === new StructType().add("value", StringType)) + + // Without schema inference, should throw error + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { + intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema( + format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) + } + } + + // With schema inference, should infer correct schema + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val schema = createFileStreamSourceAndGetSchema( + format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) + assert(schema === new StructType().add("value", StringType)) + } } } @@ -237,22 +273,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== JSON file stream schema tests ================ + test("FileStreamSource schema: json, no existing files, no schema") { withTempDir { src => - val e = intercept[AnalysisException] { - createFileStreamSourceAndGetSchema( - format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + + val e = intercept[AnalysisException] { + createFileStreamSourceAndGetSchema( + format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + } + assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } - assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } } test("FileStreamSource schema: json, existing files, no schema") { withTempDir { src => - stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}") - val schema = createFileStreamSourceAndGetSchema( - format = Some("json"), path = Some(src.getCanonicalPath), schema = None) - assert(schema === new StructType().add("c", StringType)) + + // Without schema inference, should throw error + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { + intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema( + format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + } + } + + // With schema inference, should infer correct schema + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + stringToFile(new File(src, "1"), "{'c': '1'}\n{'c': '2'}\n{'c': '3'}") + val schema = createFileStreamSourceAndGetSchema( + format = Some("json"), path = Some(src.getCanonicalPath), schema = None) + assert(schema === new StructType().add("c", StringType)) + } } } @@ -266,6 +319,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== Text file stream tests ================ + test("read from text files") { withTempDirs { case (src, tmp) => val textStream = createFileStream("text", src.getCanonicalPath) @@ -284,6 +339,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== JSON file stream tests ================ + test("read from json files") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema)) @@ -313,74 +370,82 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("read from json files with inferring schema") { withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - val fileStream = createFileStream("json", src.getCanonicalPath) - assert(fileStream.schema === StructType(Seq(StructField("c", StringType)))) + val fileStream = createFileStream("json", src.getCanonicalPath) + assert(fileStream.schema === StructType(Seq(StructField("c", StringType)))) - // FileStreamSource should infer the column "c" - val filtered = fileStream.filter($"c" contains "keep") + // FileStreamSource should infer the column "c" + val filtered = fileStream.filter($"c" contains "keep") - testStream(filtered)( - AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6") - ) + testStream(filtered)( + AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } } } test("reading from json files inside partitioned directory") { withTempDirs { case (baseSrc, tmp) => - val src = new File(baseSrc, "type=X") - src.mkdirs() + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val src = new File(baseSrc, "type=X") + src.mkdirs() - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - val fileStream = createFileStream("json", src.getCanonicalPath) + val fileStream = createFileStream("json", src.getCanonicalPath) - // FileStreamSource should infer the column "c" - val filtered = fileStream.filter($"c" contains "keep") + // FileStreamSource should infer the column "c" + val filtered = fileStream.filter($"c" contains "keep") - testStream(filtered)( - AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6") - ) + testStream(filtered)( + AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } } } test("reading from json files with changing schema") { withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'k': 'value0'}") + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'k': 'value0'}") - val fileStream = createFileStream("json", src.getCanonicalPath) + val fileStream = createFileStream("json", src.getCanonicalPath) - // FileStreamSource should infer the column "k" - assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) + // FileStreamSource should infer the column "k" + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) - // After creating DF and before starting stream, add data with different schema - // Should not affect the inferred schema any more - stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}") + // After creating DF and before starting stream, add data with different schema + // Should not affect the inferred schema any more + stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}") - testStream(fileStream)( + testStream(fileStream)( - // Should not pick up column v in the file added before start - AddTextFileData("{'k': 'value2'}", src, tmp), - CheckAnswer("value0", "value1", "value2"), + // Should not pick up column v in the file added before start + AddTextFileData("{'k': 'value2'}", src, tmp), + CheckAnswer("value0", "value1", "value2"), - // Should read data in column k, and ignore v - AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp), - CheckAnswer("value0", "value1", "value2", "value3"), + // Should read data in column k, and ignore v + AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp), + CheckAnswer("value0", "value1", "value2", "value3"), - // Should ignore rows that do not have the necessary k column - AddTextFileData("{'v': 'value4'}", src, tmp), - CheckAnswer("value0", "value1", "value2", "value3", null)) + // Should ignore rows that do not have the necessary k column + AddTextFileData("{'v': 'value4'}", src, tmp), + CheckAnswer("value0", "value1", "value2", "value3", null)) + } } } + // =============== Parquet file stream tests ================ + test("read from parquet files") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema)) @@ -402,49 +467,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("read from parquet files with changing schema") { withTempDirs { case (src, tmp) => - // Add a file so that we can infer its schema - AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp) - - val fileStream = createFileStream("parquet", src.getCanonicalPath) + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { - // FileStreamSource should infer the column "k" - assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) + // Add a file so that we can infer its schema + AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp) - // After creating DF and before starting stream, add data with different schema - // Should not affect the inferred schema any more - AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp) + val fileStream = createFileStream("parquet", src.getCanonicalPath) - testStream(fileStream)( - // Should not pick up column v in the file added before start - AddParquetFileData(Seq("value2").toDF("k"), src, tmp), - CheckAnswer("value0", "value1", "value2"), + // FileStreamSource should infer the column "k" + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) - // Should read data in column k, and ignore v - AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp), - CheckAnswer("value0", "value1", "value2", "value3"), + // After creating DF and before starting stream, add data with different schema + // Should not affect the inferred schema any more + AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp) - // Should ignore rows that do not have the necessary k column - AddParquetFileData(Seq("value5").toDF("v"), src, tmp), - CheckAnswer("value0", "value1", "value2", "value3", null) - ) - } - } + testStream(fileStream)( + // Should not pick up column v in the file added before start + AddParquetFileData(Seq("value2").toDF("k"), src, tmp), + CheckAnswer("value0", "value1", "value2"), - test("file stream source without schema") { - withTempDir { src => - // Only "text" doesn't need a schema - createFileStream("text", src.getCanonicalPath) + // Should read data in column k, and ignore v + AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp), + CheckAnswer("value0", "value1", "value2", "value3"), - // Both "json" and "parquet" require a schema if no existing file to infer - intercept[AnalysisException] { - createFileStream("json", src.getCanonicalPath) - } - intercept[AnalysisException] { - createFileStream("parquet", src.getCanonicalPath) + // Should ignore rows that do not have the necessary k column + AddParquetFileData(Seq("value5").toDF("v"), src, tmp), + CheckAnswer("value0", "value1", "value2", "value3", null) + ) } } } + // =============== file stream globbing tests ================ + test("read new files in nested directories with globbing") { withTempDirs { case (dir, tmp) => @@ -518,6 +573,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } + // =============== other tests ================ + test("fault tolerance") { withTempDirs { case (src, tmp) => val fileStream = createFileStream("text", src.getCanonicalPath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org